Dremio开源数据湖引擎

摘要

Dremio是一个现代化的数据湖引擎,提供高性能的SQL查询能力和数据虚拟化服务。本文深入分析Dremio的技术架构,涵盖其核心组件、设计模式、技术栈选型以及系统实现细节,为开发者提供全面的技术参考和学习指南。

1. 项目应用场景

Dremio作为数据湖引擎,主要应用于以下场景:

数据湖查询加速:通过反射(Reflection)技术对数据湖中的数据进行预聚合和索引,显著提升查询性能
多数据源联邦查询:支持跨多种数据源(Hadoop、S3、Azure、GCS、关系型数据库等)的统一SQL查询
数据虚拟化:提供数据的逻辑视图,无需物理移动数据即可实现数据整合和分析
自助式数据分析:为业务用户提供直观的数据探索和分析界面
实时数据处理:支持流式数据处理和近实时分析

2. 学习目标

结合Dremio的核心代码,本文设定以下学习目标:

理解分布式查询引擎架构:掌握Dremio的查询执行、优化和分布式计算机制学习存储插件设计模式:了解如何设计可扩展的数据源连接器架构掌握查询优化技术:学习基于Apache Calcite的查询优化器实现理解数据加速技术:掌握反射(Reflection)和物化视图的实现原理学习微服务架构设计:了解Dremio的服务化架构和组件间通信机制

3. 目录结构分析



dremio-oss/
├── build-tools/                    # 构建工具和配置
├── client/                         # 客户端相关代码
│   ├── base/                      # 基础客户端功能
│   └── jdbc/                      # JDBC驱动实现
├── common/                         # 通用工具和基础类
│   ├── legacy/                    # 遗留通用代码
│   └── src/                       # 核心通用功能
├── dac/                           # 数据分析中心(Data Analytics Center)
│   ├── backend/                   # DAC后端服务
│   ├── daemon/                    # 守护进程实现
│   └── ui/                        # Web用户界面
├── distribution/                   # 发布和部署相关
│   └── resources/                 # 启动脚本和配置文件
├── plugins/                       # 存储插件集合
│   ├── common/                    # 插件通用功能
│   ├── hive/                      # Hive连接器
│   ├── s3/                        # S3存储插件
│   ├── azure/                     # Azure存储插件
│   ├── gcs/                       # Google Cloud Storage插件
│   └── elasticsearch/             # Elasticsearch插件
├── sabot/                         # 查询执行引擎核心
│   ├── kernel/                    # 执行引擎内核
│   ├── logical/                   # 逻辑计划处理
│   └── serializer/                # 序列化组件
├── services/                      # 核心服务组件
│   ├── accelerator/               # 查询加速服务
│   ├── coordinator/               # 集群协调服务
│   ├── datastore/                 # 数据存储服务
│   ├── jobs/                      # 作业管理服务
│   └── namespace/                 # 命名空间服务
└── ui/                           # 前端用户界面
    └── src/                      # React前端代码

4. 关键文件清单

4.1 核心启动文件


dac/daemon/src/main/java/com/dremio/dac/daemon/DremioDaemon.java
– 主守护进程

distribution/resources/src/main/resources/bin/dremio
– 启动脚本

4.2 查询引擎核心


sabot/kernel/src/main/java/com/dremio/exec/planner/
– 查询规划器

sabot/kernel/src/main/java/com/dremio/exec/work/
– 查询执行器

sabot/kernel/src/main/java/com/dremio/sabot/
– Sabot执行引擎

4.3 存储插件架构


sabot/kernel/src/main/java/com/dremio/exec/catalog/ManagedStoragePlugin.java
– 存储插件管理

sabot/kernel/src/main/java/com/dremio/exec/store/StoragePlugin.java
– 存储插件接口

4.4 服务组件


services/jobs/src/main/java/com/dremio/service/jobs/JobsService.java
– 作业服务

services/coordinator/src/main/java/com/dremio/service/coordinator/
– 协调服务

services/namespace/src/main/java/com/dremio/service/namespace/
– 命名空间服务

4.5 配置文件


pom.xml
– Maven主配置文件

sabot/kernel/src/main/resources/sabot-module.conf
– Sabot模块配置

5. 技术栈分析

5.1 核心技术框架

查询处理层

Apache Calcite 1.22.0:SQL解析、优化和执行计划生成
Apache Arrow 18.1.1:内存列式数据格式和向量化计算
Netty 4.1.126.Final:高性能网络通信框架

存储和数据格式

Apache Parquet 1.15.2:列式存储格式
Apache Avro 1.11.4:数据序列化框架
Apache Iceberg 1.7.0:数据湖表格式

大数据生态

Hadoop 3.3.6/2.8.5:分布式文件系统和资源管理
Hive 2.3.9/3.1.1:数据仓库软件
HBase:NoSQL数据库支持

通信和序列化

gRPC 1.70.0:高性能RPC框架
Protocol Buffers 3.25.5:数据序列化协议

云存储支持

AWS SDK:Amazon S3集成
Azure Storage SDK:Azure Blob Storage集成
Google Cloud Storage:GCS集成

5.2 开发和构建工具

Maven:项目构建和依赖管理
Java 11+:主要开发语言
React:前端用户界面框架
SLF4J + Logback:日志框架

6. 设计模式识别

6.1 插件化架构模式

Dremio采用插件化架构设计,通过
StoragePlugin
接口实现不同数据源的统一访问:



// sabot/kernel/src/main/java/com/dremio/exec/store/StoragePlugin.java
public interface StoragePlugin extends AutoCloseable {
  boolean hasAccessPermission(String user, NamespaceKey key, DatasetConfig datasetConfig);
  SourceState getState();
  ViewTable getView(List<String> tableSchemaPath, SchemaConfig schemaConfig);
  Class<? extends StoragePluginRulesFactory> getRulesFactoryClass();
}

6.2 工厂模式

存储插件通过工厂模式创建和管理:



// sabot/kernel/src/main/java/com/dremio/exec/catalog/ManagedStoragePlugin.java
public class ManagedStoragePlugin implements StoragePlugin {
  private final StoragePlugin plugin;
  private final StoragePluginId pluginId;
  
  public static ManagedStoragePlugin newPlugin(
      SabotContext context,
      StoragePluginId pluginId,
      ConnectionConf<?, ?> config) {
    // 工厂方法创建插件实例
  }
}

6.3 观察者模式

查询执行过程中使用观察者模式处理状态变化:



// services/jobs/src/main/java/com/dremio/service/jobs/JobStatusListener.java
public interface JobStatusListener {
  void jobSubmitted(JobId jobId);
  void jobQueued(JobId jobId);
  void jobStarted(JobId jobId);
  void jobCompleted(JobId jobId);
  void jobFailed(JobId jobId, String reason);
}

6.4 策略模式

查询优化器使用策略模式实现不同的优化规则:



// sabot/kernel/src/main/java/com/dremio/exec/planner/RulesFactory.java
public interface RulesFactory {
  Set<RelOptRule> getRules(OptimizerRulesContext context);
}

7. 系统架构图

7.1 整体架构图

7.2 查询执行数据流图

7.3 存储插件架构图

8. 核心组件实现

8.1 DremioDaemon – 系统启动核心


DremioDaemon
是Dremio系统的启动入口,负责初始化所有核心服务:



// dac/daemon/src/main/java/com/dremio/dac/daemon/DremioDaemon.java
public class DremioDaemon {
  public static void main(String[] args) throws Exception {
    // 1. 初始化配置
    final DACConfig dacConfig = DACConfig.newConfig();
    final SabotConfig sabotConfig = dacConfig.getConfig().getSabotConfig();
    
    // 2. 执行类路径扫描
    final ScanResult classpathScan = ClassPathScanner.fromPrescan(sabotConfig);
    
    // 3. 创建并启动守护进程
    try (DACDaemon daemon = DACDaemon.newDremioDaemon(
        dacConfig, classpathScan, DACModule.class)) {
      daemon.init();
      daemon.closeOnJVMShutDown();
    }
  }
}

8.2 查询执行引擎 – Sabot

Sabot是Dremio的查询执行引擎核心,配置文件定义了其关键组件:



# sabot/kernel/src/main/resources/sabot-module.conf
dremio.classpath.scanning: {
  packages: [
    com.dremio.exec.catalog,      # 目录服务
    com.dremio.exec.expr,         # 表达式处理
    com.dremio.exec.physical,     # 物理计划
    com.dremio.exec.planner.physical, # 物理规划器
    com.dremio.exec.store,        # 存储层
    com.dremio.sabot,             # Sabot核心
  ]
}

8.3 存储插件管理 – ManagedStoragePlugin

存储插件管理器负责插件的生命周期管理和元数据缓存:



// sabot/kernel/src/main/java/com/dremio/exec/catalog/ManagedStoragePlugin.java
public class ManagedStoragePlugin implements StoragePlugin {
  private final StoragePlugin plugin;
  private final StoragePluginId pluginId;
  private final NamespaceService systemNamespace;
  
  @Override
  public SourceMetadata getSourceMetadata(
      GetMetadataOption... options) throws ConnectorException {
    // 获取数据源元数据,支持缓存和刷新
    return plugin.getSourceMetadata(options);
  }
  
  @Override
  public DatasetHandle getDatasetHandle(
      EntityPath datasetPath, 
      GetDatasetOption... options) throws ConnectorException {
    // 获取数据集句柄,用于后续数据访问
    return plugin.getDatasetHandle(datasetPath, options);
  }
}

8.4 查询优化器集成

Dremio基于Apache Calcite实现查询优化,通过规则工厂模式扩展优化规则:



// sabot/kernel/src/main/java/com/dremio/exec/planner/sql/handlers/direct/AccelCreateReflectionHandler.java
public class AccelCreateReflectionHandler implements SqlDirectHandler<SimpleCommandResult> {
  private final Catalog catalog;
  private final AccelerationManager accelManager;
  
  @Override
  public List<SimpleCommandResult> toResult(String sql, SqlNode sqlNode) throws Exception {
    // 处理创建反射的SQL命令
    SqlCreateReflection createReflection = SqlNodeUtil.unwrap(sqlNode, SqlCreateReflection.class);
    
    // 创建布局定义
    LayoutDefinition layoutDefinition = new LayoutDefinition()
        .setName(createReflection.getName())
        .setType(createReflection.getType());
    
    // 通过加速管理器创建反射
    accelManager.addLayout(layoutDefinition);
    
    return Collections.singletonList(SimpleCommandResult.successful("Reflection created"));
  }
}

8.5 作业服务 – JobsService

作业服务管理查询的整个生命周期:



// services/jobs/src/main/java/com/dremio/service/jobs/JobsService.java
public interface JobsService extends Service {
  JobSubmission submitJob(JobRequest jobRequest, JobStatusListener statusListener);
  
  JobResult getJobResults(JobId jobId, int offset, int limit);
  
  void cancel(JobId jobId, String reason);
  
  JobSummary getJobSummary(JobId jobId);
}

9. 核心流程解读

9.1 查询执行完整流程

以一个典型的SQL查询为例,展示Dremio的完整执行流程:



SELECT customer_name, SUM(order_amount) 
FROM orders o 
JOIN customers c ON o.customer_id = c.id 
WHERE order_date >= '2024-01-01' 
GROUP BY customer_name;

步骤1:SQL解析和验证



// sabot/kernel/src/main/java/com/dremio/exec/planner/sql/SqlConverter.java
public class SqlConverter {
  public SqlNode parse(String sql) {
    // 使用Calcite解析器解析SQL
    SqlParser parser = SqlParser.create(sql, parserConfig);
    return parser.parseStmt();
  }
  
  public SqlNode validate(SqlNode sqlNode) {
    // 验证SQL语法和语义
    return validator.validate(sqlNode);
  }
}

步骤2:逻辑计划生成



// sabot/kernel/src/main/java/com/dremio/exec/planner/sql/SqlToRelConverter.java
public RelRoot convertQuery(SqlNode query, boolean needsValidation, boolean top) {
  // 将SQL转换为关系代数表达式
  RelNode relNode = sqlToRelConverter.convertQuery(query, needsValidation, top).rel;
  return RelRoot.of(relNode, query.getKind());
}

步骤3:查询优化



// sabot/kernel/src/main/java/com/dremio/exec/planner/PlannerPhase.java
public enum PlannerPhase {
  LOGICAL {
    @Override
    public Prel doPlan(PlannerSettings plannerSettings, RelOptPlanner planner, RelNode relNode) {
      // 逻辑优化:谓词下推、投影下推等
      return (Prel) planner.findBestExp();
    }
  },
  
  PHYSICAL {
    @Override
    public Prel doPlan(PlannerSettings plannerSettings, RelOptPlanner planner, RelNode relNode) {
      // 物理优化:选择具体的算法实现
      return (Prel) planner.findBestExp();
    }
  }
}

步骤4:物理计划执行



// sabot/kernel/src/main/java/com/dremio/sabot/exec/FragmentExecutor.java
public class FragmentExecutor {
  public void run() {
    try {
      // 执行物理计划片段
      injector.injectUnchecked(executionControls, INJECTOR_DURING_PLANNING);
      
      // 创建操作符树
      RootExec rootExec = ImplCreator.getExec(context, root);
      
      // 执行查询
      while (rootExec.next()) {
        // 处理每个批次的数据
        RecordBatch batch = rootExec.getRecordBatch();
        // 发送结果到客户端
      }
    } finally {
      // 清理资源
    }
  }
}

9.2 反射(Reflection)加速流程

Dremio的反射技术是其核心优势之一,通过预计算和物化视图提升查询性能:

步骤1:反射创建



// services/accelerator/src/main/java/com/dremio/service/reflection/ReflectionService.java
public class ReflectionService {
  public ReflectionId createReflection(ReflectionGoal goal) {
    // 1. 验证反射定义
    validateReflectionGoal(goal);
    
    // 2. 创建反射元数据
    ReflectionId reflectionId = ReflectionId.generateId();
    goal.setId(reflectionId);
    
    // 3. 保存到存储
    reflectionGoalsStore.save(goal);
    
    // 4. 触发物化过程
    materializationManager.requestMaterialization(reflectionId);
    
    return reflectionId;
  }
}

步骤2:物化执行



// services/accelerator/src/main/java/com/dremio/service/reflection/MaterializationManager.java
public class MaterializationManager {
  public void materialize(ReflectionId reflectionId) {
    ReflectionGoal goal = reflectionGoalsStore.get(reflectionId);
    
    // 1. 生成物化查询
    String materializationQuery = buildMaterializationQuery(goal);
    
    // 2. 提交作业执行物化
    JobRequest jobRequest = JobRequest.newBuilder()
        .setSqlQuery(SqlQuery.newBuilder().setSql(materializationQuery))
        .build();
    
    JobSubmission submission = jobsService.submitJob(jobRequest, 
        new MaterializationStatusListener(reflectionId));
    
    // 3. 更新物化状态
    updateMaterializationStatus(reflectionId, submission.getJobId());
  }
}

步骤3:查询匹配和重写



// sabot/kernel/src/main/java/com/dremio/exec/planner/acceleration/ReflectionMatcher.java
public class ReflectionMatcher {
  public List<Materialization> findMatchingMaterializations(RelNode query) {
    List<Materialization> matches = new ArrayList<>();
    
    for (Materialization materialization : availableMaterializations) {
      // 检查查询是否可以使用该物化视图
      if (canUse(query, materialization)) {
        matches.add(materialization);
      }
    }
    
    return matches;
  }
  
  private boolean canUse(RelNode query, Materialization materialization) {
    // 检查投影、过滤条件、聚合等是否匹配
    return projectionMatches(query, materialization) &&
           filterMatches(query, materialization) &&
           aggregationMatches(query, materialization);
  }
}

10. 实现示例和最佳实践

10.1 自定义存储插件开发

开发自定义存储插件的最佳实践:



// 1. 定义连接配置
@SourceType(value = "CUSTOM", label = "Custom Data Source")
public class CustomStoragePluginConfig extends ConnectionConf<CustomStoragePluginConfig, CustomStoragePlugin> {
  
  @Property(label = "Connection URL")
  public String connectionUrl;
  
  @Property(label = "Username")
  public String username;
  
  @Secret
  @Property(label = "Password")
  public String password;
  
  @Override
  public CustomStoragePlugin newPlugin(SabotContext context, String name, Provider<StoragePluginId> pluginIdProvider) {
    return new CustomStoragePlugin(this, context, name, pluginIdProvider);
  }
}
 
// 2. 实现存储插件
public class CustomStoragePlugin extends StoragePluginCreator.PF4JStoragePlugin {
  
  private final CustomStoragePluginConfig config;
  
  public CustomStoragePlugin(CustomStoragePluginConfig config, SabotContext context, String name, Provider<StoragePluginId> idProvider) {
    super(context, name, idProvider);
    this.config = config;
  }
  
  @Override
  public SourceMetadata getSourceMetadata(GetMetadataOption... options) throws ConnectorException {
    // 实现元数据获取逻辑
    return new SourceMetadata() {
      @Override
      public DatasetHandleListing listDatasetHandles(GetDatasetOption... options) {
        // 返回数据集列表
      }
    };
  }
  
  @Override
  public DatasetHandle getDatasetHandle(EntityPath datasetPath, GetDatasetOption... options) {
    // 返回特定数据集的句柄
    return new CustomDatasetHandle(datasetPath, config);
  }
}

10.2 查询优化规则扩展

扩展查询优化规则的示例:



// 自定义优化规则
public class CustomOptimizationRule extends RelOptRule {
  
  public static final CustomOptimizationRule INSTANCE = new CustomOptimizationRule();
  
  private CustomOptimizationRule() {
    super(operand(LogicalProject.class, 
                 operand(LogicalFilter.class, any())), 
          "CustomOptimizationRule");
  }
  
  @Override
  public void onMatch(RelOptRuleCall call) {
    LogicalProject project = call.rel(0);
    LogicalFilter filter = call.rel(1);
    
    // 实现自定义优化逻辑
    if (canOptimize(project, filter)) {
      RelNode optimizedRel = createOptimizedRel(project, filter);
      call.transformTo(optimizedRel);
    }
  }
  
  private boolean canOptimize(LogicalProject project, LogicalFilter filter) {
    // 检查是否可以应用优化
    return true;
  }
  
  private RelNode createOptimizedRel(LogicalProject project, LogicalFilter filter) {
    // 创建优化后的关系表达式
    return project;
  }
}
 
// 注册优化规则
public class CustomRulesFactory implements RulesFactory {
  @Override
  public Set<RelOptRule> getRules(OptimizerRulesContext context) {
    return ImmutableSet.of(CustomOptimizationRule.INSTANCE);
  }
}

10.3 向量化算子实现

实现高性能向量化算子的示例:



// sabot/kernel/src/main/java/com/dremio/sabot/op/aggregate/vectorized/VectorizedHashAggOperator.java
public class CustomVectorizedOperator extends VectorizedOperator {
  
  private VectorContainer outgoing;
  private VectorizedHashAggPartition[] partitions;
  
  @Override
  public VectorAccessible setup(VectorAccessible incoming) throws Exception {
    this.incoming = incoming;
    this.outgoing = new VectorContainer(oContext.getAllocator());
    
    // 设置输出向量
    setupOutputVectors();
    
    return outgoing;
  }
  
  @Override
  public void consumeData(int records) throws Exception {
    // 处理输入数据批次
    for (int i = 0; i < records; i++) {
      // 向量化处理逻辑
      processRecord(i);
    }
  }
  
  @Override
  public int outputData() throws Exception {
    // 输出处理结果
    int recordCount = 0;
    
    for (VectorizedHashAggPartition partition : partitions) {
      recordCount += partition.outputRecords(outgoing);
    }
    
    outgoing.setRecordCount(recordCount);
    return recordCount;
  }
  
  private void processRecord(int index) {
    // 实现具体的向量化计算逻辑
    // 使用Arrow的向量化API进行高效计算
  }
}

11. 部署方式

11.1 单机部署

参考项目根目录的README.md文件中的”Quick Start”部分:

安装JDK 11+
配置Maven环境
执行
mvn clean install -DskipTests
构建
运行
./distribution/target/dremio-oss-*/bin/dremio start
启动

11.2 集群部署

参考README.md中的”Running”部分:

配置协调节点和执行节点
设置共享存储和元数据存储
配置网络和安全设置

11.3 容器化部署

使用官方Docker镜像
配置Kubernetes部署文件
设置持久化存储和服务发现

12. 社区与文档资源

GitHub仓库: https://github.com/dremio/dremio-oss
官方文档: https://docs.dremio.com/
社区论坛: https://community.dremio.com/
技术博客: https://www.dremio.com/blog/
贡献指南: 参考项目根目录的CONTRIBUTING.md文件

13. 学习总结

13.1 核心设计理念

Dremio的设计体现了以下核心理念:

数据湖优先:专为数据湖场景设计,支持多种存储格式和云存储查询加速:通过反射技术实现智能的查询加速,无需手动调优插件化架构:高度可扩展的存储插件架构,支持异构数据源向量化计算:基于Apache Arrow的向量化执行引擎,提供极致性能自助式分析:提供直观的用户界面,降低数据分析门槛

13.2 技术亮点

智能查询优化:结合Calcite优化器和自研规则,实现智能查询重写反射技术:自动识别查询模式,创建最优的物化视图存储抽象:统一的存储插件接口,屏蔽底层存储差异分布式执行:基于Fragment的分布式执行模型,支持大规模并行计算内存管理:精细的内存管理和资源控制,确保系统稳定性

13.3 借鉴价值

对于分布式系统和数据处理引擎的设计,Dremio提供了以下借鉴价值:

插件化设计:通过接口抽象和工厂模式实现高度可扩展的架构查询优化:基于成熟框架(Calcite)进行扩展,而非重新发明轮子向量化计算:利用现代CPU特性和内存布局优化,提升计算性能微服务架构:合理的服务拆分和组件化设计,便于维护和扩展元数据管理:统一的元数据服务,支持多数据源的统一视图

Dremio作为现代数据湖引擎的典型代表,其技术架构和实现细节为构建高性能、可扩展的数据处理系统提供了宝贵的参考和借鉴价值。

© 版权声明
THE END
如果内容对您有所帮助,就支持一下吧!
点赞0 分享
评论 抢沙发

请登录后发表评论

    暂无评论内容