0%

使用Apache Druid进行海量数据的存储与查询

背景说明

本文为项目实践总结,目标对用户页面浏览信息进行存储,存储周期为三年;同时需要对三年存储数据进行实时查询供运营人员分析。项目架构大致为:用户页面浏览信息实时写入Kafka,内存数据库实时消费Kafka数据进行合并去重等运算后每个用户保存一天的数据在内存中,多余的数据需要转存到时序数据库。网站用户量达到1亿,根据对线上环境的分析,每条浏览信息大概82个字节(包含时间戳,用户编号等信息),平均转存入时序数据库的数据OPS在7W左右。

公司内部有自研的实时数据分析平台NimbleRACE,集成了Apache Druid作为时序数据库,故此在该平台上进行测试以确定Apache Druid是否能够满足项目需求。

1. Apache Druid简介

Druid是Apache基金会下一款开源高性能实时分析数据库。Druid常用于需要实时数据摄取、高速查询的场景,提供高并发的聚合查询API,助力上层GUI应用分析。

1.1 基本概念

  • DataSource:数据源。定义了数据的摄取地址,摄取的Worker数,同时会定义数据的Schema。使用上类似于RDBMS中的一个数据表
  • Segment:Druid数据的数据存储单元。一个Segment中可以包含几百万行数据
  • 元数据存储(MetaStorage):存储集群元数据,包括数据源配置,Segment信息等
  • 底层存储(DeepStorage):Segment实际存储的地方
  • SegmentCache:Druid Historical用来缓存segment的地方。所有的Segment Cache的存储目录的容量应该与底层存储中Segment占用容量保持1:1的比例

1.2 架构及主要运行逻辑

Druid采用多服务分布式架构,集群由多个服务构成,每个服务又可以配置多个,这样Druid集群的容错性将会很好。Druid使用Zookeeper进行服务发现以及大量的服务交互。

最新版本的Druid将Druid的进程分为三类:Master,Data,Query。

其中Master包括: Coordinator和Overlord。Data包括:MiddleManager、Historical以及MiddleManager创建的Peon进程(摄取数据的Worker)。Query包括:Broker和Router。

各服务主要职责如下:

  • Coordinator: 负责管理集群数据的可用性
  • Overlord: 负责管理数据摄取工作
  • MiddleManager:接收摄取任务,生产摄取Worker(Peon)
  • Historical:对外缓存历史数据,对外提供查询服务
  • Broker: 对外提供查询服务
  • Router: 提供查询请求的路由服务

Druid架构图如下所示(来自官网):

img

数据写入逻辑简述

  • 提交DataSource配置文件到Overlord
  • Overlord根据DataSource中配置的taskCount,指派不同的MiddleManger生成响应的Peon Worker
  • Peon Worker摄取数据。生产Segment后,将Segmnent写入到DeepStorage。并将Segment记录到数据库中。
  • Coordinator将新生成的Segment信息,通过Zookeeper发送给Historical
  • Historical从DeepStorage中下载Segment到本地磁盘的Segment-Cache中(这也称作load segment操作),这样Segment就可以对外提供查询服务了

数据读取逻辑简述

  • 查询请求发送到Broker进行
  • Broker进程计算请求的数据时间范围所在的segment分布于哪些Historical节点或者MiddleManger上
  • Broker将请求发送到上述服务
  • 上述服务数据获取后发送给Broker
  • Broker做合并运算后最终返回给客户端

1.3 数据源

Druid支持流式数据摄入和批量数据摄入。其中流式数据支持Kafka, Kinesis和Tranquility。批量支持Hadoop和本地批量数据。详细信息请参考Druid Ingestion

  • Schema:定义数据源的格式。

  • Dimension维度配置:通常数据源中字符串类型的字段被定义成Dimension(维度)字段。可以对维度字段建立Bitmap索引。(如果某个维度列包含上亿种可能,那么用户应当慎重考虑创建索引。实际测试发现,这种情况下会非常影响Druid的写入性能,CPU占用率也会非常高。)

  • Metric:通常数据源中数字类型的字段可以被定义成Metric。Metric字段可以被rollup。

  • Rollup (聚合):数据摄入阶段,Druid可以把包含相同维度字段的行按照Metric进行聚合。聚合会将Metric字段分别求和。Rollup可以大幅度减少行数从而缩小整个数据集的大小。但是在执行Rollup的同时,也会丢失原始数据。

  • BitmapIndex 索引配置:Druid可以对Dimension字段配置Bitmap索引。配置了索引后,按维度查询能够提升速度,但也会增加CPU使用,增加Segment容量。

  • Tune(调优)配置:包含许多配置。比较重要的有:taskCount,replica, maxRowInSegment, maxBytesInMemory, maxRowsInMemory等。具体可参照官网文档

本项目使用Kafka作为数据源。

1.4 DeepStorage 底层存储

Druid生产环境下推荐使用HDFS或S3作为底层数据存储,测试环境下可以使用nfs等共享目录作为存储。

注意,Druid要求Historical服务能够访问到所有的,因此不能直接使用本地文件系统作为存储,否则会出现无法加载Segment的情况。

本项目使用S3作为底层存储。

1.5 MetaStorage 元数据存储

Druid生产环境中推荐使用MySQL或PostgreSQL作为元数据存储,测试环境中可使用自带的Derby数据库。

本项目使用MySQL作为元数据数据库。

1.6 数据查询

Druid主要提供两种数据查询方式:SQL和JSON OVER HTTP(Native Query)。SQL查询使用更简单。

本项目使用SQL进行查询。

2. 海量数据下的性能测试

2.1 硬件配置

节点 cpu 内存 网络 硬盘 OS
Node1 E5-2620 v4 @ 2.10GHz *2 160G 10Gb * 1,1Gb * 1 4T * 18 CentOS7.6
Node2 E5-2620 v4 @ 2.10GHz *2 160G 10Gb * 1,1Gb * 1 4T * 18 CentOS7.6
Node3 E5-2620 v4 @ 2.10GHz *2 160G 10Gb * 1,1Gb * 1 4T * 18 CentOS7.6

2.2 服务拓扑

img

2.3 硬盘分配

每个节点18块4T机械盘

  • 采用ROCK-S3作为Druid的Deep Storage以及Indexing Log的存储。每个节点7块硬盘添加给ROCK分布式存储
  • 由于Druid本身架构所限,Druid需要与DeepStorage1:1大小的本地磁盘空间,因此需要分配7块硬盘给Druid作为Segment-Cache使用
  • 每个节点给Kafka配置两块机械盘即可(实际环境中Druid写入速度大于三年历史轨迹的生成速度,因此不必担心Kafka数据来不及消费导致空间不足的问题)
  • Zookeeper单独分配一块磁盘
  • 每个节点上单独挂一块磁盘,作为indexTask的baseTaskDir。(druid.indexer.task.baseTaskDir参数)

2.4 数据模拟

  • 每条模拟数据大概82个字节
  • 数据主要包含如下字段:时间戳,用户编号,用户性别,用户年龄,用户浏览的页面ID,停留时间
  • 其中用户编号达到1亿个,页面ID达到10W个
  • 模拟数据以CSV格式写入Kafka,Druid解析CSV数据

2.5 测试方法

  • 使用数据生成工具往Kafka中写入测试数据
  • 配置Druid数据源接入Kafka数据源
  • 通过Druid的Metric上报机制分析统计Druid写入性能
  • 通过Druid提供的WebConsole进行数据查询,分析查询性能

2.6 测试集合

2.6.1 DataSource 8 task下的写入

  • Kafka创建8个分区
  • 设置DataSource taskCount 8。8个Peon任务同时消费Kafka
  • 数据生成工具写入Kafka压力为50W OPS
  • 不建立索引

最终统计得到的Druid写入性能为如下图所示,平均OPS在30W左右:

img

cache dropped表示开始测试前执行了 echo 3 > /proc/sys/vm/drup_cache 释放了buffer和cache内存

使用TOP监测三个机器的资源使用情况:


CPU
从上面三张图中可看到,三个节点的CPU占用率均不高,大概都在1200%~1500%左右。

内存

服务 内存 数量
Peon(Worker) 3G 3 * 3
Historical 16G 3
Master 10.3G 1
Broker 7.9G 1
Router 1.1G 1

共94G内存提供30W的OPS

2.6.2 DataSource 16 task下的写入

  • Kafka创建16个分区
  • 设置DataSource taskCount 16。16个Peon任务同时消费Kafka
  • 数据生成工具写入Kafka压力为50W OPS
  • 不建立索引

最终统计得到的Druid写入性能为如下图所示,平均OPS在23W左右:

使用TOP监测三个机器的资源使用情况:

CPU
从上面三张图中可看到,三个节点的CPU占用率很高,基本已经跑满。

内存

服务 内存 数量
Peon(Worker) 2G 16
Historical 14G 3
Master 10G 1
Broker 8G 1
Router 1G 1

共93G内存提供23W的写入OPS

与8Worker对比中我们可以看到,提升了worker数。反而Druid集群的写入能力降低了,一个可能的原因是16个worker之间的CPU抢占。

2.6.3 DataSource 8 task, 添加3个索引下的写入

  • Kafka创建12个分区
  • 设置DataSource taskCount 12。12个Peon任务同时消费Kafka
  • 数据生成工具写入Kafka压力为50W OPS
  • 对用户编号,用户年龄,用户浏览页面ID三个维度字段建立索引

在实际测试中,发现maxRowInSegment使用默认的500W时,三索引下Druid写入性能急剧下降,并观测到CPU全部跑满。出现这个的原因是,我们系统的用户量达到1亿。根据Druid的索引实现方式,对每个Segment都会生成一个1亿列,500W行的Bitmap索引;一个包含1亿元素的Map结构。每消费到一条数据都会访问上述的两个结构,这对写入性能将会造成很大影响。

解决上述问题的一种方式是,修改Tune Parameter中的maxRowInSegment将其修改为50W。另外一种尝试是,在写入Kafka前,将用户编号分别映射到Kafka的不同分区,这样每个Kafka分区中数据的用户编号集合大概为1亿/分区数。Druid每个Worker会去消费不同的Kafka分区,对每个Worker来说,其处理的用户编号就变少了,Bitmap也会变小。

Tune

  • maxRowInSegment调整为50W
  • 模拟数据按照用户编号分别写入Kafka不同分区

最终统计得到的Druid写入性能为如下图所示,平均OPS在22W左右:

使用TOP监测三个机器的资源使用情况:

CPU
从上面三张图中可看到,三个节点的CPU占用率不是很高。

内存

服务 内存 数量
Peon(Worker) 3G 16
Historical 15G 3
Master 10G 1
Broker 8G 1
Router 1G 1

共112G内存提供22W的写入OPS。这与我们的预期应该是一致的,增加索引,内存占用自然也会增加。

2.6.4 Druid查询性能

使用Druid Web Console中的Query工具进行SQL查询。使用的查询语句为:指定用户编号,开始,截止时间,LIMIT 800。

实际整个数据集中,一个用户最多不超过200条数据,LIMIT 800相当于会扫描指定时间内所有的数据。

无索引

DataSource总数据量183亿行,指定时间范围内总数据量为78亿。查询某个用户的所有记录数耗时为40~50秒左右。

用户编号索引

DataSource指定时间范围内总数据量为110亿。查询某个用户的所有记录数耗时为20秒左右。

img
img

与无索引情况下对比,我们可以看到添加索引是能增加查询效率的

2.6.5 数据压缩比

无索引下

图中可以看到存储15亿行数据大概使用了400G的空间。每行数据大概82个字节。这样算来,数据压缩比大概为 2.8:1,即Druid压缩了容量。

img

三索引下

图中可以看到存储19亿行数据大概使用了2.24T的空间。每行数据大概82个字节。这样算来,数据压缩比大概为0.6:1,即Druid反而增加了容量占用,这里的原因是索引会占用了大量空间。

img

2.7 测试结论

  • Druid能够提供稳定的写入性能,三个节点不加索引下能够提供大概30W的OPS,增加索引(索引量较大)对CPU的使用率增高,写入性能也有所下降
  • 增加Druid索引能提升查询性能。100亿行数据,根据索引大概能在20s左右查询完毕
  • 不加索引下Druid能够对原始数据进行有效压缩另外,如果开启了RollUP,那么压缩程度会更高
  • 从上面数据压缩比小节中可以看出:相同的数据源,增加索引后,当索引量较大时,会对存储容量有较大的使用
  • 从16Task写入测试的结果来看,当Task太多CPU占用过高时,写入性能会有所波动,整体性能也会有所下降

2.8 一点思考

  • Druid作为时序数据库,正如其官网中所讲可能更加适合执行OLAP相关操作,比如对某一列进行count, sum等运算。从查询性能一节中可看到,其按整行进行select的性能并不算非常出众。
  • 在本项目中,对时序数据根据时间戳进行扫描的场景,Druid没有展现出特别大的能力。可能像HBase这种数据库会更加合适:将用户编号设置为RowKey,其他字段放到一个列族里的。

3. 参考资料