KafKa与ClickHouse


Kafka下载与安装

下载

官网下载 :

Apache Kafka

确保已经安装jdk8+环境,使用javac命令测试

安装

解压

tar -xzvf kafka_2.12-3.1.0.tgz -C /opt

建议建立软连接

ln -s ./kafka_2.12-3.1.0 kafka/

查看目录结构

▶ ll /opt/kafka/
total 64K
drwxr-xr-x. 3 root root 4.0K Jan 12 17:04 bin
drwxr-xr-x. 3 root root 4.0K Mar  8 14:36 config
drwxr-xr-x. 2 root root 8.0K Mar  8 14:32 libs
-rw-r--r--. 1 root root  15K Jan 12 17:01 LICENSE
drwxr-xr-x. 2 root root  262 Jan 12 17:04 licenses
drwxr-xr-x. 2 root root  237 Mar  8 14:41 logs
-rw-r--r--. 1 root root  28K Jan 12 17:01 NOTICE
drwxr-xr-x. 2 root root   44 Jan 12 17:04 site-docs

Kafka常用命令

管理

#启动zookeeper
bin/zookeeper-server-start.sh -daemon config/zookeeper.properties 

#启动Kafka
bin/kafka-server-start.sh -daemon config/server.properties

## 创建topic(1个分区,2个副本)
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

## 创建topic (新版本)kafka版本 >= 2.2
bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test

## 分区扩容
# kafka版本 < 2.2
bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic topic1 --partitions 2

# kafka版本 >= 2.2
bin/kafka-topics.sh --bootstrap-server broker_host:port --alter --topic topic1 --partitions 2

## 删除topic
bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic test

查询

## 查询集群描述
bin/kafka-topics.sh --describe --zookeeper 127.0.0.1:2181

## topic列表查询
bin/kafka-topics.sh --list --zookeeper localhost:2181

## topic列表查询(支持0.9版本+)
bin/kafka-topics.sh --list --bootstrap-server localhost:9092

## 新消费者列表查询(支持0.9版本+)
bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server localhost:9092 --list

## 新消费者列表查询(支持0.10版本+)
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list

## 显示某个消费组的消费详情(仅支持offset存储在zookeeper上的)
bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zookeeper localhost:2181 --group test

## 显示某个消费组的消费详情(0.9版本 - 0.10.1.0 之前)
bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server localhost:9092 --describe --group test-consumer-group

## 显示某个消费组的消费详情(0.10.1.0版本+)
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group

生产和消费

## 生产者
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

## 消费者
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test

## 新生产者(支持0.9版本+)
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test --producer.config config/producer.properties

## 新消费者(支持0.9版本+)
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092  --from-beginning --consumer.config config/consumer.properties --topic test

## kafka-verifiable-consumer.sh(消费者事件,例如:offset提交等)
bin/kafka-verifiable-consumer.sh --broker-list localhost:9092 --topic test --group-id groupName

## 高级点的用法
bin/kafka-simple-consumer-shell.sh --brist localhost:9092 --topic test --partition 0 --offset 1234  --max-messages 10

ClickHouse使用Kafka外表进行数据的增量更新

kafka:

  1. 创建topic
bin/kafka-topic --create --bootstrap-server localhost:9092 --replication-factors 1 --partitions 1 --topic tdf

clickhouse:

  1. 创建Kafka消费表(Kafka引擎)
CREATE TABLE default.toll_detail_final_kafka
(
    `id` Int64,
    `toll_detail_id` Int64,
    `order_id` String,
    `toll_duty_id` String,
    `park_id` Int64,
    `licence_plate` String,
    `brand` String,
    `toll_man_name` String,
    `toll_man_id` Int64,
    `in_time` Nullable(DateTime64(3, 'Asia/Shanghai')) ,
    `out_time` Nullable(DateTime64(3, 'Asia/Shanghai')) ,
    `event_time` DateTime64(3, 'Asia/Shanghai'),
    `in_location` String,
    `in_camera_id` Int64,
    `out_location` String,
    `out_camera_id` Int64,
    `fee_expected` Decimal(15,2),
    `fee_cash` Decimal(15,2),
    `fee_third_part` Decimal(15,2),
    `fee_third_part_real` Decimal(15,2),
    `fee_member_point` Decimal(15,2),
    `fee_member_benefit` Decimal(15,2),
    `fee_etc` Decimal(15,2),
    `fee_third_part_deduction` Decimal(15,2),
    `fee_cash_by_pos` Decimal(15,2),
    `fee_online` Decimal(15,2),
    `fee_coupon` Decimal(15,2),
    `paper_coupon_fee` Decimal(15,2),
    `elec_coupon_fee` Decimal(15,2),
    `paper_coupon_hours` Int64,
    `elec_coupon_hours` Int64,
    `free_reason` String,
    `free_amount` Decimal(15,2),
    `fee_balance` Decimal(15,2),
    `owner_id` Int64,
    `owner_name` String,
    `car_type` String,
    `note` String,
    `lift_by_hand` Int32,
    `in_exception_type` String,
    `out_exception_type` String,
    `status` Int32,
    `updated_at` DateTime64(3, 'Asia/Shanghai'),
    `created_at` DateTime64(3, 'Asia/Shanghai'),
    `description` String,
    `fee_exception` Decimal(15,2),
    `fee_reserved1` Decimal(15,2),
    `fee_reserved2` Decimal(15,2)
)
ENGINE = Kafka()
SETTINGS 
kafka_broker_list = '120.0.0.1:9092',
kafka_topic_list = 'toll_detail_final',
kafka_group_name = 'tdfConsumerGroup',
kafka_format = 'JSONEachRow',
kafka_skip_broken_messages = 10;
  1. 创建实体表实际存数据(MergeTree引擎)

已经创建的tdf和tt表(以toll_detail_final表举例)

CREATE TABLE default.toll_detail_final
(
 `id` Int64,
 `toll_detail_id` Int64,
 `order_id` String,
 `toll_duty_id` String,
 `park_id` Int64,
 `licence_plate` String,
 `brand` String,
 `toll_man_name` String,
 `toll_man_id` Int64,
 `in_time` Nullable(DateTime64(3, 'Asia/Shanghai')),
 `out_time` Nullable(DateTime64(3, 'Asia/Shanghai')),
 `event_time` Nullable(DateTime64(3, 'Asia/Shanghai')),
 `in_location` String,
 `in_camera_id` Int64,
 `out_location` String,
 `out_camera_id` Int64,
 `fee_expected` Decimal(15, 2),
 `fee_cash` Decimal(15, 2),
 `fee_third_part` Decimal(15, 2),
 `fee_third_part_real` Decimal(15, 2),
 `fee_member_point` Decimal(15, 2),
 `fee_member_benefit` Decimal(15, 2),
 `fee_etc` Decimal(15, 2),
 `fee_third_part_deduction` Decimal(15, 2),
 `fee_cash_by_pos` Decimal(15, 2),
 `fee_online` Decimal(15, 2),
 `fee_coupon` Decimal(15, 2),
 `paper_coupon_fee` Decimal(15, 2),
 `elec_coupon_fee` Decimal(15, 2),
 `paper_coupon_hours` Int64,
 `elec_coupon_hours` Int64,
 `free_reason` String,
 `free_amount` Decimal(15, 2),
 `fee_balance` Decimal(15, 2),
 `owner_id` Int64,
 `owner_name` String,
 `car_type` String,
 `note` String,
 `lift_by_hand` Int32,
 `in_exception_type` String,
 `out_exception_type` String,
 `status` Int32,
 `updated_at` DateTime64(3, 'Asia/Shanghai'),
 `created_at` DateTime64(3, 'Asia/Shanghai'),
 `description` String,
 `fee_exception` Decimal(15, 2),
 `fee_reserved1` Decimal(15, 2),
 `fee_reserved2` Decimal(15, 2)
)
ENGINE = ReplacingMergeTree(updated_at)
ORDER BY id
SETTINGS index_granularity = 8192
  1. 创建物化视图,联系kafka消费表和实体表
create materialized view view_tdf_consumer TO toll_detail_final as
select id,toll_detail_id,order_id,toll_duty_id,park_id,licence_plate,brand,toll_man_name,toll_man_id,in_time,out_time,event_time,in_location,in_camera_id,out_location,out_camera_id,fee_expected,fee_cash,fee_third_part,fee_third_part_real,fee_member_point,description,fee_etc,fee_third_part_deduction,fee_cash_by_pos,fee_online,fee_coupon,paper_coupon_fee,elec_coupon_fee,paper_coupon_hours,elec_coupon_hours,free_reason,free_amount,fee_balance,owner_id,owner_name,car_type,note,lift_by_hand,in_exception_type,out_exception_type,status,updated_at,created_at,fee_member_benefit,fee_exception,fee_reserved1,fee_reserved2 from toll_detail_final_kafka;
  1. 关联/卸载视图
# 卸载视图:
detach view view_tdf_consumer;

# 关联视图:
attach table view_tdf_consumer;

kafka生产消息,kafka消费表会拉取消息,物化视图将它转换为实体表中的记录。

消息样式举例:

{"fee_exception":100.0,"out_exception_type":"","paper_coupon_fee":0.0,"fee_cash":0.0,"out_camera_id":1538,"owner_id":0,"description":"","created_at":1649317245039,"fee_reserved1":0.0,"fee_etc":0.0,"fee_reserved2":0.0,"elec_coupon_hours":0,"out_location":"车道2","car_type":"visitor","fee_expected":100.0,"fee_balance":0.0,"updated_at":1649315050000,"toll_man_name":"","fee_online":0.0,"elec_coupon_fee":0.0,"in_camera_id":1539,"id":5816289,"in_exception_type":"","brand":"","in_location":"车道1","free_amount":0.0,"out_time":1649223909000,"fee_third_part_real":0.0,"fee_member_benefit":0.0,"owner_name":"","toll_duty_id":"","paper_coupon_hours":0,"toll_detail_id":0,"park_id":2803,"fee_third_part":0.0,"fee_third_part_deduction":0.0,"licence_plate":"贵Z005A1","fee_coupon":0.0,"in_time":1649176380000,"fee_cash_by_pos":0.0,"fee_member_point":0.0,"order_id":"C2803153920220406003300143","event_time":1649314955000,"lift_by_hand":0,"status":2}