mongoDB
复制集
在
MondDB
的主从复制集架构中:
!!! warning 复制集的职能
- 数据冗余:MongoDB复制集通过在多个服务器间自动复制数据实现数据冗余,适用于需要数据备份和灾难恢复的场景。
- 高可用性:MongoDB复制集通过在主节点故障时自动切换到副本节点确保高可用性,适用于需要无中断服务的生产环境。
- 自动故障切换:MongoDB复制集在检测到主节点故障时自动将副本节点提升为主节点,实现快速故障恢复,适用于需要高可靠性的应用程序。
!!!
原理
基本信息
- 主从关系
- 仅有
一个主节点
,其余都是从节点
- 只有主节点能够
写入/读取
- 从节点能
读取数据
,但默认只有主节点负责处理请求
- 仅有
- 常见架构
- 一主一从
- 一主二从
- 复制本质
- 所有的操作都有
oplog
,从节点定期轮询主节点获取这些操作
,然后对自己的数据副本执行这些操作
。
- 所有的操作都有
- 主要特征
- N 个
奇数节点
的集群 - 基于选举机制,
任何节点可作为主节点
- 所有写入操作都在主节点上,所以增加节点
不会提高系统写性能
,可以提升读性能
- 主节点故障时,会
自动选举出新节点代替
,自动故障转移
- N 个
选举方式
选举算法:
Raft
!!! info 角色与操作
- 主节点 Master
- 数量:1
- 作用:
- 默认回应
读写请求
- 与
副本节点
通信同步操作,同步存活信息
- 默认回应
- 失效后:
- 触发选举执行
- 副本节点 Secondary
- 数量:N (无仲裁者下 N+1 应为奇数)
- 最多只能有
7个节点有选举权
,但副本可大于该数值
- 最多只能有
- 作用:
- 执行主节点下发操作,维护副本数据
- 当主节点故障时,触发选举选定下一个
Master
- 失效后:
- 不被关心
- 数量:N (无仲裁者下 N+1 应为奇数)
- 仲裁者 Arbiter
- 数量:M (M+N+1 应为奇数)
- 作用:
- 仅参与选主投票
- 不保存数据
- 失效后:
- 不被关心
!!!
- 不被关心
!!! success 选举流程
- 具有投票权的节点之间两两互相发送心跳
- 当
5次心跳
未收到时判断为节点失联 - 如果失联的是主节点,从节点会发起选举,选出新的主节点
- 如果失联的是从节点则不会产生新的选举
- 选举基于RAFT一致性算法实现,选举成功的必要条件是大多数投票节点存活
- 复制集中最多可以有50个节点,但具有投票权的节点最多7个,且为奇数个投票成员
!!!
!!! warning priority0 节点
- 图解
- 说明
- 默认Priority为1,值最大优先级越高。
- 设置Priority为0节点的选举优先级为0,
不会被选举为Primary
,但可以投票
- 身份特征
priority:0
- 常见场景
- 跨机房A、B部署了一个复制集,并且希望
Primary
必须在 A机房;这时可以将B机房的复制集成员Priority设置为0
!!!
- 跨机房A、B部署了一个复制集,并且希望
!!! warning hidden 节点
- 图解
- 说明
- 不接受
Driver
的请求 不参与选主
,也不对外提供服务
。
- 不接受
- 身份特征
- priority:0
hiddent: true
- 常见场景
- 做一些数据备份、离线计算的任务,不会影响复制集的服务
!!!
- 做一些数据备份、离线计算的任务,不会影响复制集的服务
!!! warning Delayed 节点
- 图解
- 说明
- Delayed节点必须是Hidden节点,并且其数据落后与Primary一段时间
不应该提供服务或参与选主
- 身份特征
- priority: 0
- hidden: ture
slaveDelay: 3600
- 常见场景
- 延时节点的数据集是延时的,因此它可以帮助我们在人为
误操作或是其他意外情况下恢复数据
。 - 当应用升级失败,或是误操作删除了表和数据库时,可以通过延时节点进行数据恢复
!!!
- 延时节点的数据集是延时的,因此它可以帮助我们在人为
故障自恢复
Master
发生故障- 复制集内部会进行投票选举
- 一个
Secondary
替代原有主库对外提供服务 - 复制集会自动通知客户端程序主库已切换
- 应用就会连接到新的主库
请求模式
默认情况下,应用程序将其读取操作指向复制集的主节点
但是,客户端可以通过read preference
模式指定将读取操作发送给到从节点
Read Preference Mode | Description |
---|---|
primary | 主节点,默认模式,读操作只在主节点,如果主节点不可用,报错或者抛出异常。 |
primaryPreferred | 首选主节点,大多情况下读操作在主节点,如果主节点不可用,如故障转移,读操作在从节点。 |
secondary | 从节点,读操作只在从节点, 如果从节点不可用,报错或者抛出异常。 |
secondaryPreferred | 首选从节点,大多情况下读操作在从节点,特殊情况(如单主节点架构)读操作在主节点。 |
nearest | 最邻近节点,读操作在最邻近的成员,可能是主节点或者从节点。 |
案例操作
- 安装
mongodb
# 关闭防火墙和SELinux
setenforce 0
systemctl stop firewalld
# 调整内核HPG
cat >> /etc/rc.local <<EOF
echo never > /sys/kernel/mm/transparent hugepage/enabled
EOF
chmod a+x /etc/rc.local
# 创建用户
useradd mongod
# 创建目录
mkdir -p /mongodb/{conf,data,log}
# 创建配置文件
cat > /mongodb/conf/mongo.conf <<EOF
systemLog:
destination: file
path: /mongodb/log/mongodb.log
logAppend: true
storage:
dbPath: /mongodb/data/
processManagement:
timeZoneInfo: /usr/share/zoneinfo
fork : true
net:
port: 27017
bindIp: 0.0.0.0
security:
authorization: disabled
replication:
replSetName: alfieRepl
EOF
# 下载源代码
wget https://fastdl.mongodb.org/linux/mongodb-linux-x86_64-rhel80-7.0.11.tgz
# 解压包
tar xf mongodb-linux-x86_64-rhel80-7.0.11.tgz -C /usr/local
ln -s /usr/local/mongodb-linux-x86_64-rhel80-7.0.11/ /usr/local/mongodb
# 设置PATH变量
echo PATH=/usr/local/mongodb/bin/:'$PATH' > /etc/profile.d/mongodb.sh
. /etc/profile.d/mongodb.sh
# systemd管理文件
cat > /lib/systemd/system/mongod.service <<EOF
[Unit]
Description=mongodb
After=network.target remote-fs.target nss-lookup.target
[Service]
Type=forking
User=mongod
Group=mongod
ExecStart=/usr/local/mongodb/bin/mongod --config /mongodb/conf/mongo.conf
ExecReload=/bin/kill -s HUP \$MAINPID
ExecStop=/usr/local/mongodb/bin/mongod --config /mongodb/conf/mongo.conf -- shutdown
PrivateTmp=true
LimitFSIZE=infinity
LimitCPU=infinity
LimitAS=infinity
LimitNOFILE=64000
LimitNPROC=64000
LimitMEMLOCK=infinity
TasksMax=infinity
TasksAccounting=false
[Install]
WantedBy=multi-user.target
EOF
# 文件归属修改
chown -R mongod:mongod /usr/local/mongodb/
chown -R mongod:mongod /mongodb/
# 启动服务
systemctl daemon-reload
systemctl enable --now mongod.service
- 配置一主二从
mongo --port 27017 admin
config = { _id: 'alfieRepl', members: [
{_id: 0, host: '192.168.100.101:27017'},
{_id: 1, host: '192.168.100.102:27017'} ,
{_id: 2, host: '192.168.100.103:27017'}]
}
printjson(config)
rs.initiate(config)
>> OR
rs.initiate ( )
rs.add("ip.ip.ip.ip:port")
...
- 配置一主一从一仲裁
config = { _id: 'alfieRepl', members: [
{_id: 0, host: '192.168.100.101:27017'},
{_id: 1, host: '192.168.100.102:27017'} ,
{_id: 2, host: '192.168.100.103:27017', "arbiterOnly": true}]
}
rs.initiate(config)
- 复制集状态
- rs.hello() // 常规信息
- rs.isMaster() // 查主
- rs.config() // 配置信息
- rs.printSecondaryReplicationInfo()
- db.printReplicationInfo()
管理操作
节点管理
-
rs.remove ("ip: port") // 删除节点
-
rs.add ("ip: port") // 增加节点
-
rs.addArb ("ip: port") // 增加arb节点
特殊节点管理
// 获得当前复制集点配置
config=rs.conf()
// 对节点属性进行修改
// 修改节点3为hidden节点
config.members[2].hidden=true
- config.members[2].hidden=true
- 从Hideen节点恢复为常规节点
- config.members[2].hidden=false
- config.members[2].arbiterOnly=false
- config.members[2].priority=1
- 由于与此前配置不同需要通过reconfig,删除再添加节点
主节点降级
- rs.stepDown()
开启从库读取
#打开从节点读支持
#7版本命令
db.getMongo().setReadPref('secondary')
db.getMongo().getReadPref()
#5版命令
myrepl:SECONDARY> rs.secondaryOk()
#旧版命令已废弃
myrepl:SECONDARY> rs.slaveOk()
- rs.secondaryOK() 已被抛弃
- 7版本的切换命令
zookeeper
集群架构
!!! tip 基本信息
-
集群模型
- Master/Slave 模型
-
Master
- 负责写操作,同时被称为
Leader
节点
- 负责写操作,同时被称为
-
Slave
- 负责读操作,同时被称为
follower
节点
- 负责读操作,同时被称为
-
写操作同步
- 写操作由
Master
处理完成,再同步给Slave
节点 - 当写操作大于半数节点时,
写操作
判定为成功
- 写操作由
-
可用性
- 当
可用节点 > (总节点数/2)
才认定zookeeper
系统可用
- 当
-
性能
增加服务器数量可以提高读请求处理能力
,但其效果在服务器数量增加时逐渐减弱。
!!!
集群角色
序号 | 状态 | 角色 | 职责描述 |
---|---|---|---|
1 | 稳定状态 | 领导者(Leader) | 负责处理写入请求的,事务请求的唯一调度和处理者 ,负责进行投票发起和决议,更新系统状态 |
2 | 稳定状态 | 跟随者(Follower) | 接收客户请求并向客户端返回结果,在选Leader过程中参与投票 |
3 | 稳定状态 | 观察者(Observer) | 转交客户端写请求给leader节点,和同步leader状态。 和Follower唯一区别就是 不参与Leader投票 ,也不参与写操作的"过半写成功"策略 |
4 | 中间状态 | 学习者(Learner) | 和leader进行状态同步的节点统称Learner ,包括:Follower和Observer |
5 | NA | 客户端(client) | 请求发起方 |
选举过程
!!! info 选举状态和依据
- 状态
LOOKING
:寻找Leader
状态,处于该状态需要进入选举流程LEADING
:领导者状态,处于该状态的节点说明是角色已经是Leader
FOLLOWING
:跟随者状态,表示Leader
已经选举出来,当前节点角色是follower
OBSERVER
:观察者状态,表明当前节点角色是observer
- 依据
zxid
的优先级高于myid
,zxid相同时
比较myid
大小zxid
(zookeeper transaction id):- 产生方式:代表了该服务器处理的
最后一个事务的ID
- 值含义:值越大表示它的
数据最完整
,最新
zxid
最大的节点优先选为Leader
- 产生方式:代表了该服务器处理的
myid
服务器的唯一标识(SID):- 产生方式:通过配置
myid 文件指定
- 值含义:没特定意义,常用于
人为操作选举
myid
最大的节点优先选为Leader
!!!
- 产生方式:通过配置
!!! success 首次选举
0. 配置文件会决定有选举权限的 zookeeper 节点
- 每个zookeeper 的投票中都会
包含自己的myid和zxid
- 每个节点接受并检查对方的投票信息,比如投票时间、是否状态为
LOOKING状态的投票
- 对比投票,优先检查zxid,如果zxid 不一样则 zxid 大的为leader
- 如果zxid相同则继续对比myid,myid 大的一方为 leader
sequenceDiagram participant ZK1 as ZooKeeper 1 <br>(myid=1, zxid=0x0) participant ZK2 as ZooKeeper 2 <br>(myid=2, zxid=0x0) participant ZK3 as ZooKeeper 3 <br>(myid=3, zxid=0x0) ZK1->>All: 发出投票 (myid=1, zxid=0x0) ZK2->>All: 发出投票 (myid=2, zxid=0x0) ZK3->>All: 发出投票 (myid=3, zxid=0x0) Note right of All: 每个节点接收并检查对方的投票 alt 比较 zxid ZK1->>ZK2: 检查投票 (zxid=0x0) ZK1->>ZK3: 检查投票 (zxid=0x0) ZK2->>ZK1: 检查投票 (zxid=0x0) ZK2->>ZK3: 检查投票 (zxid=0x0) ZK3->>ZK1: 检查投票 (zxid=0x0) ZK3->>ZK2: 检查投票 (zxid=0x0) Note right of All: 所有 zxid 相同,比较 myid end alt 比较 myid ZK1->>ZK2: 检查 myid (myid=1) ZK1->>ZK3: 检查 myid (myid=1) ZK2->>ZK1: 检查 myid (myid=2) ZK2->>ZK3: 检查 myid (myid=2) ZK3->>ZK1: 检查 myid (myid=3) ZK3->>ZK2: 检查 myid (myid=3) Note right of All: ZK3 的 myid 最大,成为 leader end ZK1->>ZK3: 选出 ZK3 作为 Leader ZK2->>ZK3: 选出 ZK3 作为 Leader ZK3->>All: 宣布成为 Leader
!!!
!!! warning 心跳保持
- 保持方式
- 利用
ping
确认对方是否存活
- 利用
- 触发重新选举
- 当
Leader无法响应PING
时,将重新发起 Leader 选举
- 当
- Leader无法响应的原因
- 网络阻塞
- 网络中断
- keepavlive程序故障
- 系统崩溃重启
sequenceDiagram participant Leader participant Follower1 participant Follower2 Note right of Leader: Leader 定期发送心跳消息 (PING) Leader->>Follower1: 发送 PING Leader->>Follower2: 发送 PING Note right of Follower1: Follower1 返回心跳确认 (ACK) Follower1-->>Leader: 返回 ACK Note right of Follower2: Follower2 返回心跳确认 (ACK) Follower2-->>Leader: 返回 ACK Note right of Leader: Leader 检查心跳确认 (ACK) alt 超时未收到 ACK Note right of Leader: 标记 Follower 为失效 Leader->>Follower1: 重新尝试 PING 或标记失效 else 收到 ACK Note right of Leader: 标记 Follower 正常 end Note right of Follower1: Follower1 定期发送心跳消息 (PING) Follower1->>Leader: 发送 PING Note right of Follower2: Follower2 定期发送心跳消息 (PING) Follower2->>Leader: 发送 PING Note right of Leader: 返回心跳确认 (ACK) Leader-->>Follower1: 返回 ACK Leader-->>Follower2: 返回 ACK Note right of Follower1: Follower1 检查心跳确认 (ACK) alt 超时未收到 ACK Note right of Follower1: Follower1 进入 LOOKING 状态 Follower1->>Follower1: 开始新的 Leader 选举 else 收到 ACK Note right of Follower1: 标记 Leader 正常 end Note right of Follower2: Follower2 检查心跳确认 (ACK) alt 超时未收到 ACK Note right of Follower2: Follower2 进入 LOOKING 状态 Follower2->>Follower2: 开始新的 Leader 选举 else 收到 ACK Note right of Follower2: 标记 Leader 正常 end
!!!
!!! failure 重新选举
- 异常情况出现
- ZAB(Zookeeper Atomic Broadcast) 协议就会进入恢复模式并选举产生新的Leader服务器
- ZAB协议是为分布式协调服务 ZooKeeper 专门设计的一种支持崩溃恢复的原子广播协议。
- 在ZooKeeper 中,主要依赖 ZAB 协议来实现分布式数据一致性
- 基于该协议,ZooKeeper 实现了一种主备模式的系统架构来保持集群中各个副本之间的数据一致性。
- ZAB(Zookeeper Atomic Broadcast) 协议就会进入恢复模式并选举产生新的Leader服务器
sequenceDiagram participant ZK1 as ZooKeeper 1 participant ZK2 as ZooKeeper 2 participant ZK3 as ZooKeeper 3 participant Client as Client Note right of All: Leader Election(选举阶段) ZK1->>All: 发出选举投票 (myid=1, zxid=0x0) ZK2->>All: 发出选举投票 (myid=2, zxid=0x0) ZK3->>All: 发出选举投票 (myid=3, zxid=0x0) Note right of All: ZK3 得到超半数票数,成为准 leader ZK1->>ZK3: 投票 (myid=1) ZK2->>ZK3: 投票 (myid=2) Note right of All: Discovery(发现阶段) ZK1->>ZK3: 同步最近接收的事务提议 ZK2->>ZK3: 同步最近接收的事务提议 ZK3->>ZK1: 确认接收的事务提议 ZK3->>ZK2: 确认接收的事务提议 Note right of All: Synchronization(同步阶段) ZK3->>ZK1: 同步最新提议历史 ZK3->>ZK2: 同步最新提议历史 ZK1->>ZK3: 同步完成确认 ZK2->>ZK3: 同步完成确认 Note right of All: 准 leader 成为真正的 leader ZK3->>All: 宣布成为 Leader Note right of All: Broadcast(广播阶段) ZK3->>ZK1: 广播事务 ZK3->>ZK2: 广播事务 ZK1->>ZK3: 确认事务 ZK2->>ZK3: 确认事务 Client->>ZK3: 发送请求 ZK3->>ZK1: 广播请求 ZK3->>ZK2: 广播请求 ZK1->>ZK3: 确认请求 ZK2->>ZK3: 确认请求 ZK3->>Client: 返回响应 Note right of All: 新节点加入同步 participant NewZK as 新节点 NewZK->>ZK3: 请求加入 ZK3->>NewZK: 同步最新提议历史 NewZK->>ZK3: 同步完成确认
!!!
事务日志与快照
- 事务产生
- 当集群收到
写
操作时,请求将被转给Leader
Leader
将把写
操作转换为带有状态的事务
- 当集群收到
- 事物处理
Leader
对该写
操作进行广播以便进行协调- 当超过半数节点写入
表示协调通过
Leader
将通知服务器节点将不能次写操作应用到内存
数据库中- 将该记录写入到
事务日志
中
- 快照的产生
- 当
事务日志
达到一定额数量(如10万次) - 将
内存数据库序列化
永久保存到磁盘中 - 序列化后的文件称为
快照
- 拍快照的同时会生成
事务日志
- 当
sequenceDiagram participant Client as Client participant Follower1 as ZooKeeper Follower1 participant Follower2 as ZooKeeper Follower2 participant Leader as ZooKeeper Leader Note right of Client: 发送写操作请求 Client->>Follower1: 发送写操作请求 Note right of Follower1: 将请求转发给Leader Follower1->>Leader: 转发写操作请求 Note right of Leader: Leader 处理写操作 Leader->>Leader: 将写操作转换为带有状态的事务 Note right of Leader: 广播事务给所有Follower Leader->>Follower1: 广播事务 Leader->>Follower2: 广播事务 Note right of All: Follower 接收并确认事务 Follower1->>Leader: 确认事务 Follower2->>Leader: 确认事务 Note right of Leader: Leader 收集多数确认 alt 大多数节点允许写操作 Leader->>All: 通知所有节点应用写操作 Leader->>Follower1: 应用写操作 Leader->>Follower2: 应用写操作 Note right of Follower1: 将写操作应用到内存数据库并记录到事务日志 Follower1->>Follower1: 应用写操作并记录到事务日志 Note right of Follower2: 将写操作应用到内存数据库并记录到事务日志 Follower2->>Follower2: 应用写操作并记录到事务日志 Leader->>Leader: 将写操作应用到内存数据库并记录到事务日志 Note right of All: 客户端收到写操作成功响应 Leader->>Client: 返回写操作成功响应 Note right of All: 事务日志记录达到一定次数 alt 达到事务日志阈值 (默认10W次) Note right of All: 将内存数据库序列化保存到磁盘上 Leader->>Disk: 生成快照文件 Follower1->>Disk: 生成快照文件 Follower2->>Disk: 生成快照文件 end else 未达到大多数确认 Note right of Leader: Leader 返回写操作失败响应 Leader->>Client: 返回写操作失败响应 end
环境要求
# 最低JDK版本为1.8
yum install java-1.8.0-openjdk
# JDK 11 可使用以下命令
yum install java-11-openjdk
集群部署
名称 | IP | zxid | 角色 |
---|---|---|---|
s1 | 192.168.100.101 | 1 | Follower |
s2 | 192.168.100.102 | 2 | Follower |
s2 | 192.168.100.103 | 3 | Leader |
集群安装
# 最低JDK版本为1.8
yum install java-1.8.0-openjdk
# 下载二进制
wget https://archive.apache.org/dist/zookeeper/zookeeper-3.9.0/apache-zookeeper-3.9.0-bin.tar.gz
# 解压包
tar xf apache-zookeeper-3.9.0-bin.tar.gz -C /usr/local/
ln -s /usr/local/apache-zookeeper-3.9.0-bin /usr/local/zookeeper
# 路径追加
echo 'PATH=/usr/local/zookeeper/bin:$PATH' > /etc/profile.d/zookeeper.sh
source /etc/profile.d/zookeeper.sh
# 配置文件获得
cp /usr/local/zookeeper/conf/zoo_sample.cfg /usr/local/zookeeper/conf/zoo.cfg
mkdir /usr/local/zookeeper/data
# 修改配置文件 /usr/local/zookeeper/conf/zoo.cfg
# 格式: server.MyID服务器唯一编号=服务器IP:Leader和Follower的数据同步端口
sed -i -e 's!dataDir=.*!dataDir=/usr/local/zookeeper/data!' \
-e '$ a server.1=192.168.100.101:2888:3888\nserver.2=192.168.100.102:2888:3888\nserver.3=192.168.100.103:2888:3888' \
/usr/local/zookeeper/conf/zoo.cfg
# 生成ZXID文件
# 各个myid文件的内容要和zoo.cfg文件相匹配
# server1 为1, server2 为2, server3 为3
echo 1 > /usr/local/zookeeper/data/myid
# 启动服务
zkServer.sh start
日志检查
/usr/local/zookeeper/logs/zookeeper-root-server-s1.out
端口检查
状态检查
- slave 状态,监听
3888
端口
- master 状态,监听
2888
,3888
端口
命令行访问
- 进入交互命令
- zkCli.sh -server node:2181
- zkCli.sh -server node:2181
- 创建节点/查看节点/修改节点/删除节点(默认持久节点)
- 查看节点元数据
- 查看配置
GUI访问
# 准备终端转发环境
dnf install xauth -y
ssh -X root@192.168.100.101
# 准备maven环境
dnf install maven -y
/etc/maven/settings.xml
<mirrors>
<mirror>
<id>aliyun</id>
<mirrorOf>*</mirrorOf>
<name>Nexus for aliyun</name>
<url>http://maven.aliyun.com/nexus/content/groups/public</url>
</mirror>
</mirrors>
# 下载源代码
dnf install -y git
git clone https://github.com/zzhang5/zooinspector.git
cd zooinspector
# 编译安装
mvn clean package -Dmaven.test.skip=true
# 图形界面启动
chmod +x target/zooinspector-pkg/bin/zooinspector.sh
target/zooinspector-pkg/bin/zooinspector.sh
python SDK访问
- 准备工作
# 安装python和相关库 dnf install python3 pip3 install kazoo
- 测试调用
#!/usr/bin/python3 from kazoo.client import KazooClient zk = KazooClient(hosts='192.168.100.101:2181') zk.start() # 创建节点:makepath 设置为 True ,父节点不存在则创建,其他参数不填均为默认 zk.create('/zkapp/test',b'this is a test',makepath=True) # 操作完后关闭zk连接 data=zk.get('/zkapp/test') print(data) zk.stop()
kafka
消息队列
软件之间互相通信如今就像呼吸一样简单,但回到
80年代
这是一个头疼的事情。
不同软件之间通信需要实现不同的协议,软件通信没有任何组织可言,面对这个局面。
有人率先打开局面,既然主机有总线
,那么软件之间也可以有,这便诞生了消息队列
消息队列
的出现使软件通信出现的曙光,但很快大公司也进来了,各家的消息队列
粉末登场各部兼容
在这个背景下,规范组织插手推出AMQP
,消息队列迎来了规范化发展时代
主流的消息队列工具:
Kafka
、RabbitMQ
、ActiveMQ
、RocketMQ
等
!!! tip MQ摘要信息
- 本质
- 消息队列是一种异步的服务间通信方式
- 适用场合
- 适用于无服务器和微服务架构。
- 使用方式
- 消息在被处理和删除之前一直存储在队列上,每条消息仅可被一位用户处理一次用户处理一次。
- 业务场景
- 重量级访问请求处理(削峰填谷)
- 请求按时排队(顺序收发)
- 异步通信需求(异步解耦)
- 数据分析处理(大数据分析)
- 缓存信息降低数据库压力(分布式缓存同步)
- 链路访问压力测试(蓄流压测)
!!!
简介
Kafka 是一个
事件流平台
(event streaming platform)
- 用于
发布
(写入)和订阅
(读取)事件流,包括从其他系统持续导入/导出数据。 - 用于持久可靠地
存储事件流
,只要你需要可以一直保存。 - 用于在
事件发生时
或事后
处理事件流。
!!! tip kafka摘要信息
-
开发语言:
scala
和java
-
MQ对别
-
特点
分布式
: 多机实现,不允许单机分区
: 一个消息.可以拆分出多个,分别存储在多个位置多副本
: 防止信息丢失,可以多来几个备份多订阅者
: 可以有很多应用连接kafkaZookeeper
: 早期版本的Kafka依赖于zookeeper,v2.8.0
(2021) 后可单独部署
-
优势
- 优秀的数据结构: 通过
O(1)
的磁盘数据结构提供消息的持久化 - 高吞吐量: 支持每秒数百万的消息
- 分布式: 基于分布式集群实现高可用的容错机制,可以实现
自动的故障转移
- 顺序保证: 保证数据会按照特定的顺序来处理
- 大数据友好: 支持 Hadoop
并行数据加载
- 优秀的数据结构: 通过
-
常用场景
- 大数据
!!!
- 大数据
角色流程
角色
!!! info 操作
- Producer (写)
- Producer即生产者,消息的产生者,是消息的入口。负责发布消息到Kafka
- Consumer (读)
- 消费者,用于消费消息,即处理消息
!!!
- 消费者,用于消费消息,即处理消息
!!! info 对象
- Broker
- Broker是kafka实例,每个服务器上可以有一个或多个kafka的实例
- Broker需要通过
唯一的
编号区分
- Topic
- 消息的主题,可以理解为消息的分类,
数据库上的一张表
- 一个消息即为
topic
上的一条记录 - 一个
broker
对应多个topic
- 一个
topic
可对应多个broker
(分布式存放) consumer
通过topic
访问数据,不关系存放地broker
- 消息的主题,可以理解为消息的分类,
- consumer group
consumer
需要属于一个
特定的组(不指定则为默认组)- 同一个
topic
的一条消息只能被一个consumer group
一个consuer
消费 - 多个
consumer group
可同时消费一个消息
!!!
!!! info 存储方式
- Partition -- 分区存储提高读取
topic
可以拆分为一个或多个
Partition
- 创建
topic
可指定partition
数量 - 表现形式为
文件夹
- 目的是实现
负载均衡
,提高吞吐量
- 同一个
topic
在不同的partition
唯一 partition
数量应该不超过节点数量
partition
没有顺序
- Replication -- 分片备份提高可用性
- 本质是
同样数据的副本
- 建议设定至少
2
个 kafka
的副本数量包含主分片数
- 分区角色:
AR
(总分片):Assigned Replicas
分区中的所有副本的统称,包括leader和 follower- AR= lSR+ OSR
ISR
(可用分片):ln Sync Replicas
与leader副本保持同步的副本 本 follower和leader本身组成的集合OSR
(不可用分片):out-of-Sync Replied
与leader副本同步不能同步的 follower的集合
!!!
- 本质是
流程
produceer
从集群中获知leader
信息producer
将消息
发送给leader
leader
将消息写入本地文件follower
从leader
处pull
同步消息
follower
将消息
写入的确认返回给leader
leader
收到所有replication
的确认后,返回producer
回复消息已写入
部署
名称 | IP |
---|---|
s1 | 192.168.100.101 |
s2 | 192.168.100.102 |
s2 | 192.168.100.103 |
!!! warning 版本名称
kafka由
scale
开发,scale
存在多版本,因此其版本名称会包含scale
版本呢
kafka_<scala 版本>-<kafka 版本>
目前主流scale
版本2.12 / 2.13
!!!
# java环境准备 Java8
yum install java-1.8.0-openjdk
# 下载kafka包
wget https://downloads.apache.org/kafka/3.6.2/kafka_2.13-3.6.2.tgz
# 解压链接
tar xf kafka_2.13-3.6.2.tgz -C /usr/local/
ln -s /usr/local/kafka_2.13-3.6.2/ /usr/local/kafka
# 配置PATH
echo 'PATH=/usr/local/kafka/bin:$PATH' > /etc/profile.d/kafka.sh
source /etc/profile.d/kafka.sh
# 修改配置
vim /usr/local/kafka/config/server.properties
#每个broker在集群中每个节点的正整数唯一标识,此值保存在log.dirs下的meta.properties文件 --> 节点定制修改
broker.id=1
#指定当前主机的IP做为监听地址,注意:不支持0.0.0.0 ---> 节点定制修改
listeners=PLAINTEXT://192.168.100.101:9092
#kakfa用于保存数据的目录,所有的消息都会存储在该目录当中
log.dirs=/usr/local/kafka/data
#设置创建新的topic时默认分区数量,建议和kafka的节点数量一致
num.partitions=1
#指定默认的副本数为3,可以实现故障的自动转移
default.replication.factor=3
#设置kafka中消息保留时间,默认为168小时即7天
log.retention.hours=168
#指定连接的zk的地址,zk中存储了broker的元数据信息 --> 设置相同即可
zookeeper.connect=192.168.100.101:2181,192.168.100.102:2181,192.168.100.103:2181
#设置连接zookeeper的超时时间,单位为ms,默认6秒钟
zookeeper.connection.timeout.ms=6000
# 准备数据目录
mkdir /usr/local/kafka/data
# 调整JAVA内存
vim /usr/local/kafka/bin/kafka-server-start.sh
if[ " x$KAFKA_HEAP_OPTS"="x"] ; then
export KAFKA_HEAP_OPTS=" -Xmx1G -Xms1G"
fi
# 启动服务
kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties
# 错误查看文件
/usr/local/kafka/logs/kafkaServer.out
# 关闭服务,使用systemd接管
kafka-server-stop.sh
# 准备service文件
cat > /lib/systemd/system/kafka.service <<EOF
[Unit]
Description=Apache kafka
After=network.target
[Service]
Type=simple
PIDFile=/usr/local/kafka/kafka.pid
ExecStart=/usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties
ExecStop=/bin/kill -TERM
Restart=always
RestartSec=20
[Installed]
wantedBy=multi-user.target
EOF
systemctl daemon-reload
systemctl start kafka.service
检查
zookeeper 此时多出 kafka的信息
操作
写入操作
创建topic
# 创建一个名为 `alfie` 的topic
# 为其创建 3 个分区, 每个分区2个副本 => (3分区,每个分区存放1个主分片,1个备用分片)
kafka-topics.sh --create --topic alfie \
--bootstrap-server 192.168.100.101:9092 \
--partitions 3 --replication-factor 2
发送消息
kafka-console-producer.sh --broker-list \
192.168.100.101:9092,192.168.100.102:9092,192.168.100.103:9092 \
--topic alfie
< 以下来自标准输入 >
> alfie-msg1
> alfie-msg2
>...
删除topic
kafka-topics.sh --delete \
--bootstrap-server 192.168.100.101:9092,192.168.100.102:9092,192.168.100.103:9092 \
--topic alfie
读取数据
查看topic
kafka-topics.sh --list --bootstrap-server 192.168.100.101:9092
> alfie
# 与 server.properties 中 log.dirs 路径相关
# 查看节点1内容 (节点2,节点3 也是1个主分区1个副本分区)
ls /tmp/kafka-logs/alfie*
/tmp/kafka-logs/alfie-0:
00000000000000000000.index 00000000000000000000.timeindex partition.metadata
00000000000000000000.log leader-epoch-checkpoint
/tmp/kafka-logs/alfie-1:
00000000000000000000.index 00000000000000000000.timeindex partition.metadata
00000000000000000000.log leader-epoch-checkpoint
验证topic
kafka-topics.sh --describe \
--bootstrap-server 192.168.100.101:9092 --topic alfie
Topic: alfie TopicId: 1U7iI3psRz-ffGDRpT_IpQ PartitionCount: 3 ReplicationFactor: 2 Configs:
Topic: alfie Partition: 0 Leader: 1 Replicas: 1,3 Isr: 1,3
Topic: alfie Partition: 1 Leader: 2 Replicas: 2,1 Isr: 2,1
Topic: alfie Partition: 2 Leader: 3 Replicas: 3,2 Isr: 3,2
消费消息
# --from-beginning 表示消费前发布的消息也能收到,默认只能收到消费后发布的新消息
kafka-console-consumer.sh --topic alfie \
--bootstrap-server 192.168.100.101:9092
--from-beginning
<以下来自标准输出>
alfie-msg1
alfie-msg2
# 作为group1的成员,如果该消费者消费了,同组的其他成员将无法消费
# 不会像上一条命令,可以多次通过 `--from-beginning` 多次重复消费
kafka-console-consumer.sh --topic alfie \
--bootstrap-server 192.168.100.101:9092 \
--consumer-property group.id=group1 \
--from-beginning
GUI查看
Offset Explorer
旧称Kafka Tool
,工具是一个 GUI 应用程序,用于管理和使用 Apache Kafka 群集。
# 版本要求
> 仅支持java11或以上 2024/05/31 (version 3.0)
# 安装offset explorer
wget https://www.kafkatool.com/download3/offsetexplorer.sh
bash offsetexplorer.sh