第六周作业

mongoDB

复制集

MondDB 的主从复制集架构中:
!!! warning 复制集的职能

  • 数据冗余:MongoDB复制集通过在多个服务器间自动复制数据实现数据冗余,适用于需要数据备份和灾难恢复的场景。
  • 高可用性:MongoDB复制集通过在主节点故障时自动切换到副本节点确保高可用性,适用于需要无中断服务的生产环境。
  • 自动故障切换:MongoDB复制集在检测到主节点故障时自动将副本节点提升为主节点,实现快速故障恢复,适用于需要高可靠性的应用程序。
    !!!

原理

基本信息

  • 主从关系
    • 仅有一个主节点,其余都是从节点
    • 只有主节点能够写入/读取
    • 从节点能读取数据,但默认只有主节点负责处理请求
  • 常见架构
    • 一主一从
    • 一主二从
  • 复制本质
    • 所有的操作都有oplog,从节点定期轮询主节点获取这些操作,然后对自己的数据副本执行这些操作
  • 主要特征
    • N 个奇数节点的集群
    • 基于选举机制,任何节点可作为主节点
    • 所有写入操作都在主节点上,所以增加节点不会提高系统写性能,可以提升读性能
    • 主节点故障时,会自动选举出新节点代替自动故障转移

选举方式

选举算法:Raft

!!! info 角色与操作

  • 主节点 Master
    • 数量:1
    • 作用:
      • 默认回应读写请求
      • 副本节点通信同步操作,同步存活信息
    • 失效后:
      • 触发选举执行
  • 副本节点 Secondary
    • 数量:N (无仲裁者下 N+1 应为奇数)
      • 最多只能有7个节点有选举权,但副本可大于该数值
    • 作用:
      • 执行主节点下发操作,维护副本数据
      • 当主节点故障时,触发选举选定下一个Master
    • 失效后:
      • 不被关心
  • 仲裁者 Arbiter
    • 数量:M (M+N+1 应为奇数)
    • 作用:
      • 仅参与选主投票
      • 不保存数据
    • 失效后:
      • 不被关心
        !!!

!!! success 选举流程

  1. 具有投票权的节点之间两两互相发送心跳
  2. 5次心跳未收到时判断为节点失联
  3. 如果失联的是主节点,从节点会发起选举,选出新的主节点
  4. 如果失联的是从节点则不会产生新的选举
  5. 选举基于RAFT一致性算法实现,选举成功的必要条件是大多数投票节点存活
  6. 复制集中最多可以有50个节点,但具有投票权的节点最多7个,且为奇数个投票成员
    !!!

!!! warning priority0 节点

  • 图解
  • 说明
    • 默认Priority为1,值最大优先级越高。
    • 设置Priority为0节点的选举优先级为0,不会被选举为Primary,但可以投票
  • 身份特征
    • priority:0
  • 常见场景
    • 跨机房A、B部署了一个复制集,并且希望 Primary 必须在 A机房;这时可以将B机房的复制集成员Priority设置为0
      !!!

!!! warning hidden 节点

  • 图解
  • 说明
    • 不接受Driver的请求
    • 不参与选主,也不对外提供服务
  • 身份特征
    • priority:0
    • hiddent: true
  • 常见场景
    • 做一些数据备份、离线计算的任务,不会影响复制集的服务
      !!!

!!! warning Delayed 节点

  • 图解
  • 说明
    • Delayed节点必须是Hidden节点,并且其数据落后与Primary一段时间
    • 不应该提供服务或参与选主
  • 身份特征
    • priority: 0
    • hidden: ture
    • slaveDelay: 3600
  • 常见场景
    • 延时节点的数据集是延时的,因此它可以帮助我们在人为误操作或是其他意外情况下恢复数据
    • 当应用升级失败,或是误操作删除了表和数据库时,可以通过延时节点进行数据恢复
      !!!

故障自恢复

  1. Master 发生故障
  2. 复制集内部会进行投票选举
  3. 一个Secondary替代原有主库对外提供服务
  4. 复制集会自动通知客户端程序主库已切换
  5. 应用就会连接到新的主库

请求模式

默认情况下,应用程序将其读取操作指向复制集的主节点
但是,客户端可以通过read preference 模式指定将读取操作发送给到从节点

Read Preference Mode Description
primary 主节点,默认模式,读操作只在主节点,如果主节点不可用,报错或者抛出异常。
primaryPreferred 首选主节点,大多情况下读操作在主节点,如果主节点不可用,如故障转移,读操作在从节点。
secondary 从节点,读操作只在从节点, 如果从节点不可用,报错或者抛出异常。
secondaryPreferred 首选从节点,大多情况下读操作在从节点,特殊情况(如单主节点架构)读操作在主节点。
nearest 最邻近节点,读操作在最邻近的成员,可能是主节点或者从节点。

案例操作

  1. 安装mongodb
# 关闭防火墙和SELinux
setenforce 0
systemctl stop firewalld

# 调整内核HPG
cat >> /etc/rc.local <<EOF
echo never > /sys/kernel/mm/transparent hugepage/enabled
EOF
chmod a+x /etc/rc.local

# 创建用户
useradd mongod

# 创建目录
mkdir -p /mongodb/{conf,data,log}

# 创建配置文件
cat > /mongodb/conf/mongo.conf <<EOF
systemLog:
  destination: file
  path: /mongodb/log/mongodb.log
  logAppend: true
storage:
  dbPath: /mongodb/data/
processManagement:
  timeZoneInfo: /usr/share/zoneinfo
  fork : true
net:
  port: 27017
  bindIp: 0.0.0.0
security:
  authorization: disabled
replication:
  replSetName: alfieRepl
EOF

# 下载源代码
wget https://fastdl.mongodb.org/linux/mongodb-linux-x86_64-rhel80-7.0.11.tgz

# 解压包
tar xf mongodb-linux-x86_64-rhel80-7.0.11.tgz -C /usr/local
ln -s /usr/local/mongodb-linux-x86_64-rhel80-7.0.11/ /usr/local/mongodb

# 设置PATH变量
echo PATH=/usr/local/mongodb/bin/:'$PATH' > /etc/profile.d/mongodb.sh
. /etc/profile.d/mongodb.sh

# systemd管理文件
cat > /lib/systemd/system/mongod.service <<EOF
[Unit]
Description=mongodb
After=network.target remote-fs.target nss-lookup.target

[Service]
Type=forking
User=mongod
Group=mongod
ExecStart=/usr/local/mongodb/bin/mongod --config /mongodb/conf/mongo.conf
ExecReload=/bin/kill -s HUP \$MAINPID
ExecStop=/usr/local/mongodb/bin/mongod --config /mongodb/conf/mongo.conf -- shutdown
PrivateTmp=true
LimitFSIZE=infinity
LimitCPU=infinity
LimitAS=infinity
LimitNOFILE=64000
LimitNPROC=64000
LimitMEMLOCK=infinity
TasksMax=infinity
TasksAccounting=false

[Install]
WantedBy=multi-user.target
EOF

# 文件归属修改
chown -R mongod:mongod /usr/local/mongodb/
chown -R mongod:mongod /mongodb/

# 启动服务
systemctl daemon-reload
systemctl enable --now mongod.service
  1. 配置一主二从
mongo --port 27017 admin

config = { _id: 'alfieRepl', members: [
{_id: 0, host: '192.168.100.101:27017'},
{_id: 1, host: '192.168.100.102:27017'} ,
{_id: 2, host: '192.168.100.103:27017'}]
}

printjson(config)

rs.initiate(config)

>> OR
rs.initiate ( )
rs.add("ip.ip.ip.ip:port")
...

  1. 配置一主一从一仲裁
config = { _id: 'alfieRepl', members: [
{_id: 0, host: '192.168.100.101:27017'},
{_id: 1, host: '192.168.100.102:27017'} ,
{_id: 2, host: '192.168.100.103:27017', "arbiterOnly": true}]
}

rs.initiate(config)
  1. 复制集状态
  • rs.hello() // 常规信息
  • rs.isMaster() // 查主
  • rs.config() // 配置信息
  • rs.printSecondaryReplicationInfo()
  • db.printReplicationInfo()

管理操作

节点管理

  • rs.remove ("ip: port") // 删除节点

  • rs.add ("ip: port") // 增加节点

  • rs.addArb ("ip: port") // 增加arb节点

特殊节点管理

// 获得当前复制集点配置
config=rs.conf()

// 对节点属性进行修改
// 修改节点3为hidden节点
config.members[2].hidden=true
  • config.members[2].hidden=true
  • 从Hideen节点恢复为常规节点
    • config.members[2].hidden=false
    • config.members[2].arbiterOnly=false
    • config.members[2].priority=1

    • 由于与此前配置不同需要通过reconfig,删除再添加节点

主节点降级

  • rs.stepDown()

开启从库读取

#打开从节点读支持
#7版本命令
db.getMongo().setReadPref('secondary')
db.getMongo().getReadPref()

#5版命令
myrepl:SECONDARY> rs.secondaryOk()

#旧版命令已废弃
myrepl:SECONDARY> rs.slaveOk()
  • rs.secondaryOK() 已被抛弃
  • 7版本的切换命令

zookeeper

集群架构

!!! tip 基本信息

  • 集群模型

    • Master/Slave 模型
  • Master

    • 负责写操作,同时被称为Leader节点
  • Slave

    • 负责读操作,同时被称为follower节点
  • 写操作同步

    • 写操作由Master处理完成,再同步给Slave节点
    • 当写操作大于半数节点时,写操作判定为成功
  • 可用性

    • 可用节点 > (总节点数/2) 才认定 zookeeper系统可用
  • 性能

    • 增加服务器数量可以提高读请求处理能力,但其效果在服务器数量增加时逐渐减弱。
      !!!

集群角色

序号 状态 角色 职责描述
1 稳定状态 领导者(Leader) 负责处理写入请求的,事务请求的唯一调度和处理者,负责进行投票发起和决议,更新系统状态
2 稳定状态 跟随者(Follower) 接收客户请求并向客户端返回结果,在选Leader过程中参与投票
3 稳定状态 观察者(Observer) 转交客户端写请求给leader节点,和同步leader状态。
和Follower唯一区别就是不参与Leader投票,也不参与写操作的"过半写成功"策略
4 中间状态 学习者(Learner) 和leader进行状态同步的节点统称Learner,包括:Follower和Observer
5 NA 客户端(client) 请求发起方

选举过程

!!! info 选举状态和依据

  • 状态
    • LOOKING:寻找 Leader 状态,处于该状态需要进入选举流程
    • LEADING:领导者状态,处于该状态的节点说明是角色已经是 Leader
    • FOLLOWING:跟随者状态,表示 Leader 已经选举出来,当前节点角色是 follower
    • OBSERVER:观察者状态,表明当前节点角色是 observer
  • 依据
    • zxid的优先级高于myidzxid相同时比较myid大小
    • zxid (zookeeper transaction id):
      • 产生方式:代表了该服务器处理的最后一个事务的ID
      • 值含义:值越大表示它的数据最完整最新
      • zxid 最大的节点优先选为 Leader
    • myid 服务器的唯一标识(SID):
      • 产生方式:通过配置 myid 文件指定
      • 值含义:没特定意义,常用于人为操作选举
      • myid 最大的节点优先选为 Leader
        !!!

!!! success 首次选举
0. 配置文件会决定有选举权限的 zookeeper 节点

  1. 每个zookeeper 的投票中都会包含自己的myid和zxid
  2. 每个节点接受并检查对方的投票信息,比如投票时间、是否状态为LOOKING状态的投票
  3. 对比投票,优先检查zxid,如果zxid 不一样则 zxid 大的为leader
  4. 如果zxid相同则继续对比myid,myid 大的一方为 leader
sequenceDiagram
    participant ZK1 as ZooKeeper 1 <br>(myid=1, zxid=0x0)
    participant ZK2 as ZooKeeper 2 <br>(myid=2, zxid=0x0)
    participant ZK3 as ZooKeeper 3 <br>(myid=3, zxid=0x0)
    
    ZK1->>All: 发出投票 (myid=1, zxid=0x0)
    ZK2->>All: 发出投票 (myid=2, zxid=0x0)
    ZK3->>All: 发出投票 (myid=3, zxid=0x0)
    
    Note right of All: 每个节点接收并检查对方的投票
    
    alt 比较 zxid
        ZK1->>ZK2: 检查投票 (zxid=0x0)
        ZK1->>ZK3: 检查投票 (zxid=0x0)
        ZK2->>ZK1: 检查投票 (zxid=0x0)
        ZK2->>ZK3: 检查投票 (zxid=0x0)
        ZK3->>ZK1: 检查投票 (zxid=0x0)
        ZK3->>ZK2: 检查投票 (zxid=0x0)
        
        Note right of All: 所有 zxid 相同,比较 myid
    end
    
    alt 比较 myid
        ZK1->>ZK2: 检查 myid (myid=1)
        ZK1->>ZK3: 检查 myid (myid=1)
        ZK2->>ZK1: 检查 myid (myid=2)
        ZK2->>ZK3: 检查 myid (myid=2)
        ZK3->>ZK1: 检查 myid (myid=3)
        ZK3->>ZK2: 检查 myid (myid=3)
        
        Note right of All: ZK3 的 myid 最大,成为 leader
    end
    
    ZK1->>ZK3: 选出 ZK3 作为 Leader
    ZK2->>ZK3: 选出 ZK3 作为 Leader
    ZK3->>All: 宣布成为 Leader

!!!

!!! warning 心跳保持

  • 保持方式
    • 利用ping确认对方是否存活
  • 触发重新选举
    • Leader无法响应PING 时,将重新发起 Leader 选举
  • Leader无法响应的原因
    1. 网络阻塞
    2. 网络中断
    3. keepavlive程序故障
    4. 系统崩溃重启
sequenceDiagram
    participant Leader
    participant Follower1
    participant Follower2
    
    Note right of Leader: Leader 定期发送心跳消息 (PING)
    Leader->>Follower1: 发送 PING
    Leader->>Follower2: 发送 PING
    
    Note right of Follower1: Follower1 返回心跳确认 (ACK)
    Follower1-->>Leader: 返回 ACK
    
    Note right of Follower2: Follower2 返回心跳确认 (ACK)
    Follower2-->>Leader: 返回 ACK
    
    Note right of Leader: Leader 检查心跳确认 (ACK)
    alt 超时未收到 ACK
        Note right of Leader: 标记 Follower 为失效
        Leader->>Follower1: 重新尝试 PING 或标记失效
    else 收到 ACK
        Note right of Leader: 标记 Follower 正常
    end
    
    Note right of Follower1: Follower1 定期发送心跳消息 (PING)
    Follower1->>Leader: 发送 PING
    
    Note right of Follower2: Follower2 定期发送心跳消息 (PING)
    Follower2->>Leader: 发送 PING
    
    Note right of Leader: 返回心跳确认 (ACK)
    Leader-->>Follower1: 返回 ACK
    Leader-->>Follower2: 返回 ACK
    
    Note right of Follower1: Follower1 检查心跳确认 (ACK)
    alt 超时未收到 ACK
        Note right of Follower1: Follower1 进入 LOOKING 状态
        Follower1->>Follower1: 开始新的 Leader 选举
    else 收到 ACK
        Note right of Follower1: 标记 Leader 正常
    end
    
    Note right of Follower2: Follower2 检查心跳确认 (ACK)
    alt 超时未收到 ACK
        Note right of Follower2: Follower2 进入 LOOKING 状态
        Follower2->>Follower2: 开始新的 Leader 选举
    else 收到 ACK
        Note right of Follower2: 标记 Leader 正常
    end

!!!

!!! failure 重新选举

  • 异常情况出现
    • ZAB(Zookeeper Atomic Broadcast) 协议就会进入恢复模式并选举产生新的Leader服务器
      • ZAB协议是为分布式协调服务 ZooKeeper 专门设计的一种支持崩溃恢复的原子广播协议。
      • 在ZooKeeper 中,主要依赖 ZAB 协议来实现分布式数据一致性
      • 基于该协议,ZooKeeper 实现了一种主备模式的系统架构来保持集群中各个副本之间的数据一致性。
sequenceDiagram
    participant ZK1 as ZooKeeper 1
    participant ZK2 as ZooKeeper 2
    participant ZK3 as ZooKeeper 3
    participant Client as Client
    
    Note right of All: Leader Election(选举阶段)
    ZK1->>All: 发出选举投票 (myid=1, zxid=0x0)
    ZK2->>All: 发出选举投票 (myid=2, zxid=0x0)
    ZK3->>All: 发出选举投票 (myid=3, zxid=0x0)
    
    Note right of All: ZK3 得到超半数票数,成为准 leader
    ZK1->>ZK3: 投票 (myid=1)
    ZK2->>ZK3: 投票 (myid=2)
    
    Note right of All: Discovery(发现阶段)
    ZK1->>ZK3: 同步最近接收的事务提议
    ZK2->>ZK3: 同步最近接收的事务提议
    
    ZK3->>ZK1: 确认接收的事务提议
    ZK3->>ZK2: 确认接收的事务提议
    
    Note right of All: Synchronization(同步阶段)
    ZK3->>ZK1: 同步最新提议历史
    ZK3->>ZK2: 同步最新提议历史
    
    ZK1->>ZK3: 同步完成确认
    ZK2->>ZK3: 同步完成确认
    
    Note right of All: 准 leader 成为真正的 leader
    ZK3->>All: 宣布成为 Leader
    
    Note right of All: Broadcast(广播阶段)
    ZK3->>ZK1: 广播事务
    ZK3->>ZK2: 广播事务
    
    ZK1->>ZK3: 确认事务
    ZK2->>ZK3: 确认事务
    
    Client->>ZK3: 发送请求
    ZK3->>ZK1: 广播请求
    ZK3->>ZK2: 广播请求
    
    ZK1->>ZK3: 确认请求
    ZK2->>ZK3: 确认请求
    
    ZK3->>Client: 返回响应
    
    Note right of All: 新节点加入同步
    participant NewZK as 新节点
    NewZK->>ZK3: 请求加入
    ZK3->>NewZK: 同步最新提议历史
    NewZK->>ZK3: 同步完成确认

!!!

事务日志与快照

  • 事务产生
    • 当集群收到操作时,请求将被转给Leader
    • Leader将把操作转换为带有状态的事务
  • 事物处理
    • Leader对该操作进行广播以便进行协调
    • 当超过半数节点写入表示协调通过
    • Leader将通知服务器节点将不能次写操作应用到内存数据库中
    • 将该记录写入到事务日志
  • 快照的产生
    • 事务日志达到一定额数量(如10万次)
    • 内存数据库序列化永久保存到磁盘中
    • 序列化后的文件称为快照
    • 拍快照的同时会生成事务日志
sequenceDiagram
    participant Client as Client
    participant Follower1 as ZooKeeper Follower1
    participant Follower2 as ZooKeeper Follower2
    participant Leader as ZooKeeper Leader
    
    Note right of Client: 发送写操作请求
    Client->>Follower1: 发送写操作请求
    
    Note right of Follower1: 将请求转发给Leader
    Follower1->>Leader: 转发写操作请求
    
    Note right of Leader: Leader 处理写操作
    Leader->>Leader: 将写操作转换为带有状态的事务
    
    Note right of Leader: 广播事务给所有Follower
    Leader->>Follower1: 广播事务
    Leader->>Follower2: 广播事务
    
    Note right of All: Follower 接收并确认事务
    Follower1->>Leader: 确认事务
    Follower2->>Leader: 确认事务
    
    Note right of Leader: Leader 收集多数确认
    alt 大多数节点允许写操作
        Leader->>All: 通知所有节点应用写操作
        Leader->>Follower1: 应用写操作
        Leader->>Follower2: 应用写操作
        
        Note right of Follower1: 将写操作应用到内存数据库并记录到事务日志
        Follower1->>Follower1: 应用写操作并记录到事务日志
        
        Note right of Follower2: 将写操作应用到内存数据库并记录到事务日志
        Follower2->>Follower2: 应用写操作并记录到事务日志
        
        Leader->>Leader: 将写操作应用到内存数据库并记录到事务日志
        
        Note right of All: 客户端收到写操作成功响应
        Leader->>Client: 返回写操作成功响应
        
        Note right of All: 事务日志记录达到一定次数
        alt 达到事务日志阈值 (默认10W次)
            Note right of All: 将内存数据库序列化保存到磁盘上
            Leader->>Disk: 生成快照文件
            Follower1->>Disk: 生成快照文件
            Follower2->>Disk: 生成快照文件
        end
    else 未达到大多数确认
        Note right of Leader: Leader 返回写操作失败响应
        Leader->>Client: 返回写操作失败响应
    end

环境要求

# 最低JDK版本为1.8
yum install java-1.8.0-openjdk

# JDK 11 可使用以下命令
yum install java-11-openjdk

集群部署

名称 IP zxid 角色
s1 192.168.100.101 1 Follower
s2 192.168.100.102 2 Follower
s2 192.168.100.103 3 Leader

集群安装

# 最低JDK版本为1.8
yum install java-1.8.0-openjdk

# 下载二进制
wget https://archive.apache.org/dist/zookeeper/zookeeper-3.9.0/apache-zookeeper-3.9.0-bin.tar.gz

# 解压包
tar xf apache-zookeeper-3.9.0-bin.tar.gz -C /usr/local/
ln -s /usr/local/apache-zookeeper-3.9.0-bin /usr/local/zookeeper

# 路径追加
echo 'PATH=/usr/local/zookeeper/bin:$PATH' > /etc/profile.d/zookeeper.sh
source  /etc/profile.d/zookeeper.sh

# 配置文件获得
cp /usr/local/zookeeper/conf/zoo_sample.cfg /usr/local/zookeeper/conf/zoo.cfg

mkdir /usr/local/zookeeper/data
# 修改配置文件 /usr/local/zookeeper/conf/zoo.cfg 
# 格式: server.MyID服务器唯一编号=服务器IP:Leader和Follower的数据同步端口
sed -i -e 's!dataDir=.*!dataDir=/usr/local/zookeeper/data!' \
-e '$ a server.1=192.168.100.101:2888:3888\nserver.2=192.168.100.102:2888:3888\nserver.3=192.168.100.103:2888:3888' \
/usr/local/zookeeper/conf/zoo.cfg

# 生成ZXID文件
# 各个myid文件的内容要和zoo.cfg文件相匹配
# server1 为1, server2 为2, server3 为3
echo 1 > /usr/local/zookeeper/data/myid

# 启动服务
zkServer.sh start

日志检查

/usr/local/zookeeper/logs/zookeeper-root-server-s1.out

端口检查

状态检查

  • slave 状态,监听3888端口
  • master 状态,监听2888,3888端口

命令行访问

  • 进入交互命令
    • zkCli.sh -server node:2181
  • 创建节点/查看节点/修改节点/删除节点(默认持久节点)
  • 查看节点元数据
  • 查看配置

GUI访问

# 准备终端转发环境
dnf install xauth -y
ssh -X root@192.168.100.101

# 准备maven环境
dnf install maven -y
/etc/maven/settings.xml
  <mirrors>
    <mirror>
      <id>aliyun</id>
      <mirrorOf>*</mirrorOf>
      <name>Nexus for aliyun</name>
      <url>http://maven.aliyun.com/nexus/content/groups/public</url>
    </mirror>
  </mirrors>

# 下载源代码
dnf install -y git
git clone https://github.com/zzhang5/zooinspector.git
cd zooinspector

# 编译安装
mvn clean package -Dmaven.test.skip=true

# 图形界面启动
chmod +x target/zooinspector-pkg/bin/zooinspector.sh
target/zooinspector-pkg/bin/zooinspector.sh

python SDK访问

  • 准备工作
    # 安装python和相关库
    dnf install python3 
    pip3 install kazoo
  • 测试调用
    #!/usr/bin/python3
    from kazoo.client import KazooClient
    zk = KazooClient(hosts='192.168.100.101:2181')
    
    zk.start()
    # 创建节点:makepath 设置为 True ,父节点不存在则创建,其他参数不填均为默认
    zk.create('/zkapp/test',b'this is a test',makepath=True)
    
    # 操作完后关闭zk连接
    data=zk.get('/zkapp/test')
    print(data)
    zk.stop()

kafka

消息队列

软件之间互相通信如今就像呼吸一样简单,但回到80年代这是一个头疼的事情。
不同软件之间通信需要实现不同的协议,软件通信没有任何组织可言,面对这个局面。
有人率先打开局面,既然主机有总线,那么软件之间也可以有,这便诞生了消息队列
消息队列的出现使软件通信出现的曙光,但很快大公司也进来了,各家的消息队列粉末登场各部兼容
在这个背景下,规范组织插手推出AMQP,消息队列迎来了规范化发展时代

主流的消息队列工具: KafkaRabbitMQActiveMQRocketMQ

!!! tip MQ摘要信息

  • 本质
    • 消息队列是一种异步的服务间通信方式
  • 适用场合
    • 适用于无服务器和微服务架构。
  • 使用方式
    • 消息在被处理和删除之前一直存储在队列上,每条消息仅可被一位用户处理一次用户处理一次。
  • 业务场景
    1. 重量级访问请求处理(削峰填谷)
    2. 请求按时排队(顺序收发)
    3. 异步通信需求(异步解耦)
    4. 数据分析处理(大数据分析)
    5. 缓存信息降低数据库压力(分布式缓存同步)
    6. 链路访问压力测试(蓄流压测)
      !!!

简介

Kafka 是一个事件流平台(event streaming platform)

  • 用于发布(写入)和订阅(读取)事件流,包括从其他系统持续导入/导出数据。
  • 用于持久可靠地存储事件流,只要你需要可以一直保存。
  • 用于在事件发生时事后处理事件流。

!!! tip kafka摘要信息

  • 开发语言:

    • scalajava
  • MQ对别

  • 特点

    • 分布式: 多机实现,不允许单机
    • 分区: 一个消息.可以拆分出多个,分别存储在多个位置
    • 多副本: 防止信息丢失,可以多来几个备份
    • 多订阅者: 可以有很多应用连接kafka
    • Zookeeper: 早期版本的Kafka依赖于zookeeper, v2.8.0 (2021) 后可单独部署
  • 优势

    • 优秀的数据结构: 通过 O(1) 的磁盘数据结构提供消息的持久化
    • 高吞吐量: 支持每秒数百万的消息
    • 分布式: 基于分布式集群实现高可用的容错机制,可以实现自动的故障转移
    • 顺序保证: 保证数据会按照特定的顺序来处理
    • 大数据友好: 支持 Hadoop 并行数据加载
  • 常用场景

    • 大数据
      !!!

角色流程

角色

!!! info 操作

  • Producer (写)
    • Producer即生产者,消息的产生者,是消息的入口。负责发布消息到Kafka
  • Consumer (读)
    • 消费者,用于消费消息,即处理消息
      !!!

!!! info 对象

  • Broker
    • Broker是kafka实例,每个服务器上可以有一个或多个kafka的实例
    • Broker需要通过唯一的编号区分
  • Topic
    • 消息的主题,可以理解为消息的分类,数据库上的一张表
    • 一个消息即为topic上的一条记录
    • 一个broker对应多个topic
    • 一个topic可对应多个broker (分布式存放)
    • consumer 通过 topic访问数据,不关系存放地broker
  • consumer group
    • consumer 需要属于一个特定的组(不指定则为默认组)
    • 同一个topic的一条消息只能被一个consumer group一个consuer消费
    • 多个consumer group可同时消费一个消息
      !!!

!!! info 存储方式

  • Partition -- 分区存储提高读取
    • topic可以拆分为一个或多个 Partition
    • 创建topic可指定partition数量
    • 表现形式为文件夹
    • 目的是实现负载均衡,提高吞吐量
    • 同一个topic在不同的partition唯一
    • partition 数量应该不超过节点数量
    • partition 没有顺序
  • Replication -- 分片备份提高可用性
    • 本质是同样数据的副本
    • 建议设定至少2
    • kafka的副本数量包含主分片数
    • 分区角色:
      • AR(总分片): Assigned Replicas 分区中的所有副本的统称,包括leader和 follower
        • AR= lSR+ OSR
      • ISR(可用分片): ln Sync Replicas 与leader副本保持同步的副本 本 follower和leader本身组成的集合
      • OSR(不可用分片): out-of-Sync Replied 与leader副本同步不能同步的 follower的集合
        !!!

流程

  1. produceer从集群中获知leader信息
  2. producer消息发送给leader
  3. leader将消息写入本地文件
  4. followerleaderpull同步消息
  5. follower消息写入的确认返回给leader
  6. leader收到所有replication的确认后,返回producer回复消息已写入

部署

名称 IP
s1 192.168.100.101
s2 192.168.100.102
s2 192.168.100.103

!!! warning 版本名称

kafka由scale开发,scale存在多版本,因此其版本名称会包含scale版本呢
kafka_<scala 版本>-<kafka 版本>
目前主流scale版本 2.12 / 2.13
!!!

# java环境准备 Java8
yum install java-1.8.0-openjdk

# 下载kafka包
wget https://downloads.apache.org/kafka/3.6.2/kafka_2.13-3.6.2.tgz

# 解压链接
tar xf kafka_2.13-3.6.2.tgz -C /usr/local/
ln -s /usr/local/kafka_2.13-3.6.2/ /usr/local/kafka

# 配置PATH
echo 'PATH=/usr/local/kafka/bin:$PATH' > /etc/profile.d/kafka.sh
source /etc/profile.d/kafka.sh

# 修改配置
vim /usr/local/kafka/config/server.properties
#每个broker在集群中每个节点的正整数唯一标识,此值保存在log.dirs下的meta.properties文件 --> 节点定制修改
broker.id=1 
#指定当前主机的IP做为监听地址,注意:不支持0.0.0.0 ---> 节点定制修改
listeners=PLAINTEXT://192.168.100.101:9092 
#kakfa用于保存数据的目录,所有的消息都会存储在该目录当中
log.dirs=/usr/local/kafka/data 
#设置创建新的topic时默认分区数量,建议和kafka的节点数量一致
num.partitions=1 
#指定默认的副本数为3,可以实现故障的自动转移
default.replication.factor=3 
#设置kafka中消息保留时间,默认为168小时即7天
log.retention.hours=168 
#指定连接的zk的地址,zk中存储了broker的元数据信息 --> 设置相同即可
zookeeper.connect=192.168.100.101:2181,192.168.100.102:2181,192.168.100.103:2181 
#设置连接zookeeper的超时时间,单位为ms,默认6秒钟
zookeeper.connection.timeout.ms=6000 

# 准备数据目录
mkdir /usr/local/kafka/data

# 调整JAVA内存
vim /usr/local/kafka/bin/kafka-server-start.sh
if[ " x$KAFKA_HEAP_OPTS"="x"] ; then
  export KAFKA_HEAP_OPTS=" -Xmx1G -Xms1G"
fi

# 启动服务
kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties

# 错误查看文件
/usr/local/kafka/logs/kafkaServer.out

# 关闭服务,使用systemd接管
kafka-server-stop.sh

# 准备service文件
cat > /lib/systemd/system/kafka.service <<EOF
[Unit]
Description=Apache kafka
After=network.target

[Service]
Type=simple
PIDFile=/usr/local/kafka/kafka.pid
ExecStart=/usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties
ExecStop=/bin/kill -TERM 
Restart=always
RestartSec=20

[Installed]
wantedBy=multi-user.target
EOF

systemctl daemon-reload
systemctl start kafka.service

检查

zookeeper 此时多出 kafka的信息

操作

写入操作

创建topic

# 创建一个名为 `alfie` 的topic
# 为其创建 3 个分区, 每个分区2个副本 => (3分区,每个分区存放1个主分片,1个备用分片)
kafka-topics.sh --create --topic alfie \
--bootstrap-server 192.168.100.101:9092 \
--partitions 3 --replication-factor 2

发送消息

kafka-console-producer.sh --broker-list \
192.168.100.101:9092,192.168.100.102:9092,192.168.100.103:9092 \
--topic alfie

< 以下来自标准输入 >
> alfie-msg1
> alfie-msg2
>...

删除topic

kafka-topics.sh --delete \
--bootstrap-server 192.168.100.101:9092,192.168.100.102:9092,192.168.100.103:9092 \
--topic alfie

读取数据

查看topic

kafka-topics.sh --list --bootstrap-server 192.168.100.101:9092
> alfie

# 与 server.properties 中 log.dirs 路径相关
# 查看节点1内容 (节点2,节点3 也是1个主分区1个副本分区)
ls /tmp/kafka-logs/alfie*
/tmp/kafka-logs/alfie-0:
00000000000000000000.index  00000000000000000000.timeindex  partition.metadata
00000000000000000000.log    leader-epoch-checkpoint

/tmp/kafka-logs/alfie-1:
00000000000000000000.index  00000000000000000000.timeindex  partition.metadata
00000000000000000000.log    leader-epoch-checkpoint

验证topic

kafka-topics.sh --describe \
--bootstrap-server 192.168.100.101:9092 --topic alfie

Topic: alfie	TopicId: 1U7iI3psRz-ffGDRpT_IpQ	PartitionCount: 3	ReplicationFactor: 2	Configs: 
    Topic: alfie	Partition: 0	Leader: 1	Replicas: 1,3	Isr: 1,3
    Topic: alfie	Partition: 1	Leader: 2	Replicas: 2,1	Isr: 2,1
    Topic: alfie	Partition: 2	Leader: 3	Replicas: 3,2	Isr: 3,2

消费消息

# --from-beginning  表示消费前发布的消息也能收到,默认只能收到消费后发布的新消息
kafka-console-consumer.sh --topic alfie \
--bootstrap-server 192.168.100.101:9092
--from-beginning

<以下来自标准输出>
alfie-msg1
alfie-msg2

# 作为group1的成员,如果该消费者消费了,同组的其他成员将无法消费
# 不会像上一条命令,可以多次通过 `--from-beginning` 多次重复消费
kafka-console-consumer.sh --topic alfie \
--bootstrap-server 192.168.100.101:9092 \
--consumer-property group.id=group1 \
--from-beginning

GUI查看

Offset Explorer 旧称Kafka Tool,工具是一个 GUI 应用程序,用于管理和使用 Apache Kafka 群集。

# 版本要求
> 仅支持java11或以上 2024/05/31 (version 3.0)

# 安装offset explorer
wget https://www.kafkatool.com/download3/offsetexplorer.sh
bash offsetexplorer.sh