KafKa与ClickHouse
Kafka下载与安装
下载
官网下载 :
确保已经安装jdk8+环境,使用javac
命令测试
安装
解压
tar -xzvf kafka_2.12-3.1.0.tgz -C /opt
建议建立软连接
ln -s ./kafka_2.12-3.1.0 kafka/
查看目录结构
▶ ll /opt/kafka/
total 64K
drwxr-xr-x. 3 root root 4.0K Jan 12 17:04 bin
drwxr-xr-x. 3 root root 4.0K Mar 8 14:36 config
drwxr-xr-x. 2 root root 8.0K Mar 8 14:32 libs
-rw-r--r--. 1 root root 15K Jan 12 17:01 LICENSE
drwxr-xr-x. 2 root root 262 Jan 12 17:04 licenses
drwxr-xr-x. 2 root root 237 Mar 8 14:41 logs
-rw-r--r--. 1 root root 28K Jan 12 17:01 NOTICE
drwxr-xr-x. 2 root root 44 Jan 12 17:04 site-docs
Kafka常用命令
管理
#启动zookeeper
bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
#启动Kafka
bin/kafka-server-start.sh -daemon config/server.properties
## 创建topic(1个分区,2个副本)
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
## 创建topic (新版本)kafka版本 >= 2.2
bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test
## 分区扩容
# kafka版本 < 2.2
bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic topic1 --partitions 2
# kafka版本 >= 2.2
bin/kafka-topics.sh --bootstrap-server broker_host:port --alter --topic topic1 --partitions 2
## 删除topic
bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic test
查询
## 查询集群描述
bin/kafka-topics.sh --describe --zookeeper 127.0.0.1:2181
## topic列表查询
bin/kafka-topics.sh --list --zookeeper localhost:2181
## topic列表查询(支持0.9版本+)
bin/kafka-topics.sh --list --bootstrap-server localhost:9092
## 新消费者列表查询(支持0.9版本+)
bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server localhost:9092 --list
## 新消费者列表查询(支持0.10版本+)
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
## 显示某个消费组的消费详情(仅支持offset存储在zookeeper上的)
bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zookeeper localhost:2181 --group test
## 显示某个消费组的消费详情(0.9版本 - 0.10.1.0 之前)
bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server localhost:9092 --describe --group test-consumer-group
## 显示某个消费组的消费详情(0.10.1.0版本+)
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group
生产和消费
## 生产者
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
## 消费者
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test
## 新生产者(支持0.9版本+)
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test --producer.config config/producer.properties
## 新消费者(支持0.9版本+)
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --consumer.config config/consumer.properties --topic test
## kafka-verifiable-consumer.sh(消费者事件,例如:offset提交等)
bin/kafka-verifiable-consumer.sh --broker-list localhost:9092 --topic test --group-id groupName
## 高级点的用法
bin/kafka-simple-consumer-shell.sh --brist localhost:9092 --topic test --partition 0 --offset 1234 --max-messages 10
ClickHouse使用Kafka外表进行数据的增量更新
kafka:
- 创建topic
bin/kafka-topic --create --bootstrap-server localhost:9092 --replication-factors 1 --partitions 1 --topic tdf
clickhouse:
- 创建Kafka消费表(Kafka引擎)
CREATE TABLE default.toll_detail_final_kafka
(
`id` Int64,
`toll_detail_id` Int64,
`order_id` String,
`toll_duty_id` String,
`park_id` Int64,
`licence_plate` String,
`brand` String,
`toll_man_name` String,
`toll_man_id` Int64,
`in_time` Nullable(DateTime64(3, 'Asia/Shanghai')) ,
`out_time` Nullable(DateTime64(3, 'Asia/Shanghai')) ,
`event_time` DateTime64(3, 'Asia/Shanghai'),
`in_location` String,
`in_camera_id` Int64,
`out_location` String,
`out_camera_id` Int64,
`fee_expected` Decimal(15,2),
`fee_cash` Decimal(15,2),
`fee_third_part` Decimal(15,2),
`fee_third_part_real` Decimal(15,2),
`fee_member_point` Decimal(15,2),
`fee_member_benefit` Decimal(15,2),
`fee_etc` Decimal(15,2),
`fee_third_part_deduction` Decimal(15,2),
`fee_cash_by_pos` Decimal(15,2),
`fee_online` Decimal(15,2),
`fee_coupon` Decimal(15,2),
`paper_coupon_fee` Decimal(15,2),
`elec_coupon_fee` Decimal(15,2),
`paper_coupon_hours` Int64,
`elec_coupon_hours` Int64,
`free_reason` String,
`free_amount` Decimal(15,2),
`fee_balance` Decimal(15,2),
`owner_id` Int64,
`owner_name` String,
`car_type` String,
`note` String,
`lift_by_hand` Int32,
`in_exception_type` String,
`out_exception_type` String,
`status` Int32,
`updated_at` DateTime64(3, 'Asia/Shanghai'),
`created_at` DateTime64(3, 'Asia/Shanghai'),
`description` String,
`fee_exception` Decimal(15,2),
`fee_reserved1` Decimal(15,2),
`fee_reserved2` Decimal(15,2)
)
ENGINE = Kafka()
SETTINGS
kafka_broker_list = '120.0.0.1:9092',
kafka_topic_list = 'toll_detail_final',
kafka_group_name = 'tdfConsumerGroup',
kafka_format = 'JSONEachRow',
kafka_skip_broken_messages = 10;
- 创建实体表实际存数据(MergeTree引擎)
已经创建的tdf和tt表(以toll_detail_final表举例)
CREATE TABLE default.toll_detail_final
(
`id` Int64,
`toll_detail_id` Int64,
`order_id` String,
`toll_duty_id` String,
`park_id` Int64,
`licence_plate` String,
`brand` String,
`toll_man_name` String,
`toll_man_id` Int64,
`in_time` Nullable(DateTime64(3, 'Asia/Shanghai')),
`out_time` Nullable(DateTime64(3, 'Asia/Shanghai')),
`event_time` Nullable(DateTime64(3, 'Asia/Shanghai')),
`in_location` String,
`in_camera_id` Int64,
`out_location` String,
`out_camera_id` Int64,
`fee_expected` Decimal(15, 2),
`fee_cash` Decimal(15, 2),
`fee_third_part` Decimal(15, 2),
`fee_third_part_real` Decimal(15, 2),
`fee_member_point` Decimal(15, 2),
`fee_member_benefit` Decimal(15, 2),
`fee_etc` Decimal(15, 2),
`fee_third_part_deduction` Decimal(15, 2),
`fee_cash_by_pos` Decimal(15, 2),
`fee_online` Decimal(15, 2),
`fee_coupon` Decimal(15, 2),
`paper_coupon_fee` Decimal(15, 2),
`elec_coupon_fee` Decimal(15, 2),
`paper_coupon_hours` Int64,
`elec_coupon_hours` Int64,
`free_reason` String,
`free_amount` Decimal(15, 2),
`fee_balance` Decimal(15, 2),
`owner_id` Int64,
`owner_name` String,
`car_type` String,
`note` String,
`lift_by_hand` Int32,
`in_exception_type` String,
`out_exception_type` String,
`status` Int32,
`updated_at` DateTime64(3, 'Asia/Shanghai'),
`created_at` DateTime64(3, 'Asia/Shanghai'),
`description` String,
`fee_exception` Decimal(15, 2),
`fee_reserved1` Decimal(15, 2),
`fee_reserved2` Decimal(15, 2)
)
ENGINE = ReplacingMergeTree(updated_at)
ORDER BY id
SETTINGS index_granularity = 8192
- 创建物化视图,联系kafka消费表和实体表
create materialized view view_tdf_consumer TO toll_detail_final as
select id,toll_detail_id,order_id,toll_duty_id,park_id,licence_plate,brand,toll_man_name,toll_man_id,in_time,out_time,event_time,in_location,in_camera_id,out_location,out_camera_id,fee_expected,fee_cash,fee_third_part,fee_third_part_real,fee_member_point,description,fee_etc,fee_third_part_deduction,fee_cash_by_pos,fee_online,fee_coupon,paper_coupon_fee,elec_coupon_fee,paper_coupon_hours,elec_coupon_hours,free_reason,free_amount,fee_balance,owner_id,owner_name,car_type,note,lift_by_hand,in_exception_type,out_exception_type,status,updated_at,created_at,fee_member_benefit,fee_exception,fee_reserved1,fee_reserved2 from toll_detail_final_kafka;
- 关联/卸载视图
# 卸载视图:
detach view view_tdf_consumer;
# 关联视图:
attach table view_tdf_consumer;
kafka生产消息,kafka消费表会拉取消息,物化视图将它转换为实体表中的记录。
消息样式举例:
{"fee_exception":100.0,"out_exception_type":"","paper_coupon_fee":0.0,"fee_cash":0.0,"out_camera_id":1538,"owner_id":0,"description":"","created_at":1649317245039,"fee_reserved1":0.0,"fee_etc":0.0,"fee_reserved2":0.0,"elec_coupon_hours":0,"out_location":"车道2","car_type":"visitor","fee_expected":100.0,"fee_balance":0.0,"updated_at":1649315050000,"toll_man_name":"","fee_online":0.0,"elec_coupon_fee":0.0,"in_camera_id":1539,"id":5816289,"in_exception_type":"","brand":"","in_location":"车道1","free_amount":0.0,"out_time":1649223909000,"fee_third_part_real":0.0,"fee_member_benefit":0.0,"owner_name":"","toll_duty_id":"","paper_coupon_hours":0,"toll_detail_id":0,"park_id":2803,"fee_third_part":0.0,"fee_third_part_deduction":0.0,"licence_plate":"贵Z005A1","fee_coupon":0.0,"in_time":1649176380000,"fee_cash_by_pos":0.0,"fee_member_point":0.0,"order_id":"C2803153920220406003300143","event_time":1649314955000,"lift_by_hand":0,"status":2}