摘要
Dremio是一个现代化的数据湖引擎,提供高性能的SQL查询能力和数据虚拟化服务。本文深入分析Dremio的技术架构,涵盖其核心组件、设计模式、技术栈选型以及系统实现细节,为开发者提供全面的技术参考和学习指南。
1. 项目应用场景
Dremio作为数据湖引擎,主要应用于以下场景:
数据湖查询加速:通过反射(Reflection)技术对数据湖中的数据进行预聚合和索引,显著提升查询性能
多数据源联邦查询:支持跨多种数据源(Hadoop、S3、Azure、GCS、关系型数据库等)的统一SQL查询
数据虚拟化:提供数据的逻辑视图,无需物理移动数据即可实现数据整合和分析
自助式数据分析:为业务用户提供直观的数据探索和分析界面
实时数据处理:支持流式数据处理和近实时分析
2. 学习目标
结合Dremio的核心代码,本文设定以下学习目标:
理解分布式查询引擎架构:掌握Dremio的查询执行、优化和分布式计算机制学习存储插件设计模式:了解如何设计可扩展的数据源连接器架构掌握查询优化技术:学习基于Apache Calcite的查询优化器实现理解数据加速技术:掌握反射(Reflection)和物化视图的实现原理学习微服务架构设计:了解Dremio的服务化架构和组件间通信机制
3. 目录结构分析
dremio-oss/
├── build-tools/ # 构建工具和配置
├── client/ # 客户端相关代码
│ ├── base/ # 基础客户端功能
│ └── jdbc/ # JDBC驱动实现
├── common/ # 通用工具和基础类
│ ├── legacy/ # 遗留通用代码
│ └── src/ # 核心通用功能
├── dac/ # 数据分析中心(Data Analytics Center)
│ ├── backend/ # DAC后端服务
│ ├── daemon/ # 守护进程实现
│ └── ui/ # Web用户界面
├── distribution/ # 发布和部署相关
│ └── resources/ # 启动脚本和配置文件
├── plugins/ # 存储插件集合
│ ├── common/ # 插件通用功能
│ ├── hive/ # Hive连接器
│ ├── s3/ # S3存储插件
│ ├── azure/ # Azure存储插件
│ ├── gcs/ # Google Cloud Storage插件
│ └── elasticsearch/ # Elasticsearch插件
├── sabot/ # 查询执行引擎核心
│ ├── kernel/ # 执行引擎内核
│ ├── logical/ # 逻辑计划处理
│ └── serializer/ # 序列化组件
├── services/ # 核心服务组件
│ ├── accelerator/ # 查询加速服务
│ ├── coordinator/ # 集群协调服务
│ ├── datastore/ # 数据存储服务
│ ├── jobs/ # 作业管理服务
│ └── namespace/ # 命名空间服务
└── ui/ # 前端用户界面
└── src/ # React前端代码
4. 关键文件清单
4.1 核心启动文件
– 主守护进程
dac/daemon/src/main/java/com/dremio/dac/daemon/DremioDaemon.java
– 启动脚本
distribution/resources/src/main/resources/bin/dremio
4.2 查询引擎核心
– 查询规划器
sabot/kernel/src/main/java/com/dremio/exec/planner/
– 查询执行器
sabot/kernel/src/main/java/com/dremio/exec/work/
– Sabot执行引擎
sabot/kernel/src/main/java/com/dremio/sabot/
4.3 存储插件架构
– 存储插件管理
sabot/kernel/src/main/java/com/dremio/exec/catalog/ManagedStoragePlugin.java
– 存储插件接口
sabot/kernel/src/main/java/com/dremio/exec/store/StoragePlugin.java
4.4 服务组件
– 作业服务
services/jobs/src/main/java/com/dremio/service/jobs/JobsService.java
– 协调服务
services/coordinator/src/main/java/com/dremio/service/coordinator/
– 命名空间服务
services/namespace/src/main/java/com/dremio/service/namespace/
4.5 配置文件
– Maven主配置文件
pom.xml
– Sabot模块配置
sabot/kernel/src/main/resources/sabot-module.conf
5. 技术栈分析
5.1 核心技术框架
查询处理层
Apache Calcite 1.22.0:SQL解析、优化和执行计划生成
Apache Arrow 18.1.1:内存列式数据格式和向量化计算
Netty 4.1.126.Final:高性能网络通信框架
存储和数据格式
Apache Parquet 1.15.2:列式存储格式
Apache Avro 1.11.4:数据序列化框架
Apache Iceberg 1.7.0:数据湖表格式
大数据生态
Hadoop 3.3.6/2.8.5:分布式文件系统和资源管理
Hive 2.3.9/3.1.1:数据仓库软件
HBase:NoSQL数据库支持
通信和序列化
gRPC 1.70.0:高性能RPC框架
Protocol Buffers 3.25.5:数据序列化协议
云存储支持
AWS SDK:Amazon S3集成
Azure Storage SDK:Azure Blob Storage集成
Google Cloud Storage:GCS集成
5.2 开发和构建工具
Maven:项目构建和依赖管理
Java 11+:主要开发语言
React:前端用户界面框架
SLF4J + Logback:日志框架
6. 设计模式识别
6.1 插件化架构模式
Dremio采用插件化架构设计,通过接口实现不同数据源的统一访问:
StoragePlugin
// sabot/kernel/src/main/java/com/dremio/exec/store/StoragePlugin.java
public interface StoragePlugin extends AutoCloseable {
boolean hasAccessPermission(String user, NamespaceKey key, DatasetConfig datasetConfig);
SourceState getState();
ViewTable getView(List<String> tableSchemaPath, SchemaConfig schemaConfig);
Class<? extends StoragePluginRulesFactory> getRulesFactoryClass();
}
6.2 工厂模式
存储插件通过工厂模式创建和管理:
// sabot/kernel/src/main/java/com/dremio/exec/catalog/ManagedStoragePlugin.java
public class ManagedStoragePlugin implements StoragePlugin {
private final StoragePlugin plugin;
private final StoragePluginId pluginId;
public static ManagedStoragePlugin newPlugin(
SabotContext context,
StoragePluginId pluginId,
ConnectionConf<?, ?> config) {
// 工厂方法创建插件实例
}
}
6.3 观察者模式
查询执行过程中使用观察者模式处理状态变化:
// services/jobs/src/main/java/com/dremio/service/jobs/JobStatusListener.java
public interface JobStatusListener {
void jobSubmitted(JobId jobId);
void jobQueued(JobId jobId);
void jobStarted(JobId jobId);
void jobCompleted(JobId jobId);
void jobFailed(JobId jobId, String reason);
}
6.4 策略模式
查询优化器使用策略模式实现不同的优化规则:
// sabot/kernel/src/main/java/com/dremio/exec/planner/RulesFactory.java
public interface RulesFactory {
Set<RelOptRule> getRules(OptimizerRulesContext context);
}
7. 系统架构图
7.1 整体架构图
7.2 查询执行数据流图
7.3 存储插件架构图
8. 核心组件实现
8.1 DremioDaemon – 系统启动核心
是Dremio系统的启动入口,负责初始化所有核心服务:
DremioDaemon
// dac/daemon/src/main/java/com/dremio/dac/daemon/DremioDaemon.java
public class DremioDaemon {
public static void main(String[] args) throws Exception {
// 1. 初始化配置
final DACConfig dacConfig = DACConfig.newConfig();
final SabotConfig sabotConfig = dacConfig.getConfig().getSabotConfig();
// 2. 执行类路径扫描
final ScanResult classpathScan = ClassPathScanner.fromPrescan(sabotConfig);
// 3. 创建并启动守护进程
try (DACDaemon daemon = DACDaemon.newDremioDaemon(
dacConfig, classpathScan, DACModule.class)) {
daemon.init();
daemon.closeOnJVMShutDown();
}
}
}
8.2 查询执行引擎 – Sabot
Sabot是Dremio的查询执行引擎核心,配置文件定义了其关键组件:
# sabot/kernel/src/main/resources/sabot-module.conf
dremio.classpath.scanning: {
packages: [
com.dremio.exec.catalog, # 目录服务
com.dremio.exec.expr, # 表达式处理
com.dremio.exec.physical, # 物理计划
com.dremio.exec.planner.physical, # 物理规划器
com.dremio.exec.store, # 存储层
com.dremio.sabot, # Sabot核心
]
}
8.3 存储插件管理 – ManagedStoragePlugin
存储插件管理器负责插件的生命周期管理和元数据缓存:
// sabot/kernel/src/main/java/com/dremio/exec/catalog/ManagedStoragePlugin.java
public class ManagedStoragePlugin implements StoragePlugin {
private final StoragePlugin plugin;
private final StoragePluginId pluginId;
private final NamespaceService systemNamespace;
@Override
public SourceMetadata getSourceMetadata(
GetMetadataOption... options) throws ConnectorException {
// 获取数据源元数据,支持缓存和刷新
return plugin.getSourceMetadata(options);
}
@Override
public DatasetHandle getDatasetHandle(
EntityPath datasetPath,
GetDatasetOption... options) throws ConnectorException {
// 获取数据集句柄,用于后续数据访问
return plugin.getDatasetHandle(datasetPath, options);
}
}
8.4 查询优化器集成
Dremio基于Apache Calcite实现查询优化,通过规则工厂模式扩展优化规则:
// sabot/kernel/src/main/java/com/dremio/exec/planner/sql/handlers/direct/AccelCreateReflectionHandler.java
public class AccelCreateReflectionHandler implements SqlDirectHandler<SimpleCommandResult> {
private final Catalog catalog;
private final AccelerationManager accelManager;
@Override
public List<SimpleCommandResult> toResult(String sql, SqlNode sqlNode) throws Exception {
// 处理创建反射的SQL命令
SqlCreateReflection createReflection = SqlNodeUtil.unwrap(sqlNode, SqlCreateReflection.class);
// 创建布局定义
LayoutDefinition layoutDefinition = new LayoutDefinition()
.setName(createReflection.getName())
.setType(createReflection.getType());
// 通过加速管理器创建反射
accelManager.addLayout(layoutDefinition);
return Collections.singletonList(SimpleCommandResult.successful("Reflection created"));
}
}
8.5 作业服务 – JobsService
作业服务管理查询的整个生命周期:
// services/jobs/src/main/java/com/dremio/service/jobs/JobsService.java
public interface JobsService extends Service {
JobSubmission submitJob(JobRequest jobRequest, JobStatusListener statusListener);
JobResult getJobResults(JobId jobId, int offset, int limit);
void cancel(JobId jobId, String reason);
JobSummary getJobSummary(JobId jobId);
}
9. 核心流程解读
9.1 查询执行完整流程
以一个典型的SQL查询为例,展示Dremio的完整执行流程:
SELECT customer_name, SUM(order_amount)
FROM orders o
JOIN customers c ON o.customer_id = c.id
WHERE order_date >= '2024-01-01'
GROUP BY customer_name;
步骤1:SQL解析和验证
// sabot/kernel/src/main/java/com/dremio/exec/planner/sql/SqlConverter.java
public class SqlConverter {
public SqlNode parse(String sql) {
// 使用Calcite解析器解析SQL
SqlParser parser = SqlParser.create(sql, parserConfig);
return parser.parseStmt();
}
public SqlNode validate(SqlNode sqlNode) {
// 验证SQL语法和语义
return validator.validate(sqlNode);
}
}
步骤2:逻辑计划生成
// sabot/kernel/src/main/java/com/dremio/exec/planner/sql/SqlToRelConverter.java
public RelRoot convertQuery(SqlNode query, boolean needsValidation, boolean top) {
// 将SQL转换为关系代数表达式
RelNode relNode = sqlToRelConverter.convertQuery(query, needsValidation, top).rel;
return RelRoot.of(relNode, query.getKind());
}
步骤3:查询优化
// sabot/kernel/src/main/java/com/dremio/exec/planner/PlannerPhase.java
public enum PlannerPhase {
LOGICAL {
@Override
public Prel doPlan(PlannerSettings plannerSettings, RelOptPlanner planner, RelNode relNode) {
// 逻辑优化:谓词下推、投影下推等
return (Prel) planner.findBestExp();
}
},
PHYSICAL {
@Override
public Prel doPlan(PlannerSettings plannerSettings, RelOptPlanner planner, RelNode relNode) {
// 物理优化:选择具体的算法实现
return (Prel) planner.findBestExp();
}
}
}
步骤4:物理计划执行
// sabot/kernel/src/main/java/com/dremio/sabot/exec/FragmentExecutor.java
public class FragmentExecutor {
public void run() {
try {
// 执行物理计划片段
injector.injectUnchecked(executionControls, INJECTOR_DURING_PLANNING);
// 创建操作符树
RootExec rootExec = ImplCreator.getExec(context, root);
// 执行查询
while (rootExec.next()) {
// 处理每个批次的数据
RecordBatch batch = rootExec.getRecordBatch();
// 发送结果到客户端
}
} finally {
// 清理资源
}
}
}
9.2 反射(Reflection)加速流程
Dremio的反射技术是其核心优势之一,通过预计算和物化视图提升查询性能:
步骤1:反射创建
// services/accelerator/src/main/java/com/dremio/service/reflection/ReflectionService.java
public class ReflectionService {
public ReflectionId createReflection(ReflectionGoal goal) {
// 1. 验证反射定义
validateReflectionGoal(goal);
// 2. 创建反射元数据
ReflectionId reflectionId = ReflectionId.generateId();
goal.setId(reflectionId);
// 3. 保存到存储
reflectionGoalsStore.save(goal);
// 4. 触发物化过程
materializationManager.requestMaterialization(reflectionId);
return reflectionId;
}
}
步骤2:物化执行
// services/accelerator/src/main/java/com/dremio/service/reflection/MaterializationManager.java
public class MaterializationManager {
public void materialize(ReflectionId reflectionId) {
ReflectionGoal goal = reflectionGoalsStore.get(reflectionId);
// 1. 生成物化查询
String materializationQuery = buildMaterializationQuery(goal);
// 2. 提交作业执行物化
JobRequest jobRequest = JobRequest.newBuilder()
.setSqlQuery(SqlQuery.newBuilder().setSql(materializationQuery))
.build();
JobSubmission submission = jobsService.submitJob(jobRequest,
new MaterializationStatusListener(reflectionId));
// 3. 更新物化状态
updateMaterializationStatus(reflectionId, submission.getJobId());
}
}
步骤3:查询匹配和重写
// sabot/kernel/src/main/java/com/dremio/exec/planner/acceleration/ReflectionMatcher.java
public class ReflectionMatcher {
public List<Materialization> findMatchingMaterializations(RelNode query) {
List<Materialization> matches = new ArrayList<>();
for (Materialization materialization : availableMaterializations) {
// 检查查询是否可以使用该物化视图
if (canUse(query, materialization)) {
matches.add(materialization);
}
}
return matches;
}
private boolean canUse(RelNode query, Materialization materialization) {
// 检查投影、过滤条件、聚合等是否匹配
return projectionMatches(query, materialization) &&
filterMatches(query, materialization) &&
aggregationMatches(query, materialization);
}
}
10. 实现示例和最佳实践
10.1 自定义存储插件开发
开发自定义存储插件的最佳实践:
// 1. 定义连接配置
@SourceType(value = "CUSTOM", label = "Custom Data Source")
public class CustomStoragePluginConfig extends ConnectionConf<CustomStoragePluginConfig, CustomStoragePlugin> {
@Property(label = "Connection URL")
public String connectionUrl;
@Property(label = "Username")
public String username;
@Secret
@Property(label = "Password")
public String password;
@Override
public CustomStoragePlugin newPlugin(SabotContext context, String name, Provider<StoragePluginId> pluginIdProvider) {
return new CustomStoragePlugin(this, context, name, pluginIdProvider);
}
}
// 2. 实现存储插件
public class CustomStoragePlugin extends StoragePluginCreator.PF4JStoragePlugin {
private final CustomStoragePluginConfig config;
public CustomStoragePlugin(CustomStoragePluginConfig config, SabotContext context, String name, Provider<StoragePluginId> idProvider) {
super(context, name, idProvider);
this.config = config;
}
@Override
public SourceMetadata getSourceMetadata(GetMetadataOption... options) throws ConnectorException {
// 实现元数据获取逻辑
return new SourceMetadata() {
@Override
public DatasetHandleListing listDatasetHandles(GetDatasetOption... options) {
// 返回数据集列表
}
};
}
@Override
public DatasetHandle getDatasetHandle(EntityPath datasetPath, GetDatasetOption... options) {
// 返回特定数据集的句柄
return new CustomDatasetHandle(datasetPath, config);
}
}
10.2 查询优化规则扩展
扩展查询优化规则的示例:
// 自定义优化规则
public class CustomOptimizationRule extends RelOptRule {
public static final CustomOptimizationRule INSTANCE = new CustomOptimizationRule();
private CustomOptimizationRule() {
super(operand(LogicalProject.class,
operand(LogicalFilter.class, any())),
"CustomOptimizationRule");
}
@Override
public void onMatch(RelOptRuleCall call) {
LogicalProject project = call.rel(0);
LogicalFilter filter = call.rel(1);
// 实现自定义优化逻辑
if (canOptimize(project, filter)) {
RelNode optimizedRel = createOptimizedRel(project, filter);
call.transformTo(optimizedRel);
}
}
private boolean canOptimize(LogicalProject project, LogicalFilter filter) {
// 检查是否可以应用优化
return true;
}
private RelNode createOptimizedRel(LogicalProject project, LogicalFilter filter) {
// 创建优化后的关系表达式
return project;
}
}
// 注册优化规则
public class CustomRulesFactory implements RulesFactory {
@Override
public Set<RelOptRule> getRules(OptimizerRulesContext context) {
return ImmutableSet.of(CustomOptimizationRule.INSTANCE);
}
}
10.3 向量化算子实现
实现高性能向量化算子的示例:
// sabot/kernel/src/main/java/com/dremio/sabot/op/aggregate/vectorized/VectorizedHashAggOperator.java
public class CustomVectorizedOperator extends VectorizedOperator {
private VectorContainer outgoing;
private VectorizedHashAggPartition[] partitions;
@Override
public VectorAccessible setup(VectorAccessible incoming) throws Exception {
this.incoming = incoming;
this.outgoing = new VectorContainer(oContext.getAllocator());
// 设置输出向量
setupOutputVectors();
return outgoing;
}
@Override
public void consumeData(int records) throws Exception {
// 处理输入数据批次
for (int i = 0; i < records; i++) {
// 向量化处理逻辑
processRecord(i);
}
}
@Override
public int outputData() throws Exception {
// 输出处理结果
int recordCount = 0;
for (VectorizedHashAggPartition partition : partitions) {
recordCount += partition.outputRecords(outgoing);
}
outgoing.setRecordCount(recordCount);
return recordCount;
}
private void processRecord(int index) {
// 实现具体的向量化计算逻辑
// 使用Arrow的向量化API进行高效计算
}
}
11. 部署方式
11.1 单机部署
参考项目根目录的README.md文件中的”Quick Start”部分:
安装JDK 11+
配置Maven环境
执行构建
mvn clean install -DskipTests
运行启动
./distribution/target/dremio-oss-*/bin/dremio start
11.2 集群部署
参考README.md中的”Running”部分:
配置协调节点和执行节点
设置共享存储和元数据存储
配置网络和安全设置
11.3 容器化部署
使用官方Docker镜像
配置Kubernetes部署文件
设置持久化存储和服务发现
12. 社区与文档资源
GitHub仓库: https://github.com/dremio/dremio-oss
官方文档: https://docs.dremio.com/
社区论坛: https://community.dremio.com/
技术博客: https://www.dremio.com/blog/
贡献指南: 参考项目根目录的CONTRIBUTING.md文件
13. 学习总结
13.1 核心设计理念
Dremio的设计体现了以下核心理念:
数据湖优先:专为数据湖场景设计,支持多种存储格式和云存储查询加速:通过反射技术实现智能的查询加速,无需手动调优插件化架构:高度可扩展的存储插件架构,支持异构数据源向量化计算:基于Apache Arrow的向量化执行引擎,提供极致性能自助式分析:提供直观的用户界面,降低数据分析门槛
13.2 技术亮点
智能查询优化:结合Calcite优化器和自研规则,实现智能查询重写反射技术:自动识别查询模式,创建最优的物化视图存储抽象:统一的存储插件接口,屏蔽底层存储差异分布式执行:基于Fragment的分布式执行模型,支持大规模并行计算内存管理:精细的内存管理和资源控制,确保系统稳定性
13.3 借鉴价值
对于分布式系统和数据处理引擎的设计,Dremio提供了以下借鉴价值:
插件化设计:通过接口抽象和工厂模式实现高度可扩展的架构查询优化:基于成熟框架(Calcite)进行扩展,而非重新发明轮子向量化计算:利用现代CPU特性和内存布局优化,提升计算性能微服务架构:合理的服务拆分和组件化设计,便于维护和扩展元数据管理:统一的元数据服务,支持多数据源的统一视图
Dremio作为现代数据湖引擎的典型代表,其技术架构和实现细节为构建高性能、可扩展的数据处理系统提供了宝贵的参考和借鉴价值。





















暂无评论内容