EasyMQ

分布式消息队列系统 · HTTP API · Web 管理界面

10000+
QPS
多队列
动态创建
BadgerDB
持久化
Go
纯 Go 实现

产品概述

EasyMQ 是一款完全自主研发的分布式消息队列系统,采用 Go 语言编写,基于 Gin Web 框架,提供 HTTP 接口进行消息生产和消费。系统支持多队列管理、消息确认机制、BadgerDB 持久化、分布式节点管理和完整的 Web 管理界面。

EasyMQ 定位于微服务异步通信、任务队列、日志收集、事件驱动架构等场景,提供轻量级、易集成的消息队列解决方案,支持多节点部署,保证消息可靠投递。

技术架构

┌─────────────────────────────────────────────────────────┐
│                    Web Interface                         │
│               (HTML/CSS/JS)                              │
└────────────────────┬────────────────────────────┘
                     │
┌────────────────────▼────────────────────────────┐
│              HTTP API Layer               │
│         (Gin Web Framework)             │
└────────────────────┬────────────────────────────┘
                     │
        ┌────────────┼────────────┐
        │            │            │
┌───────▼──────┐ ┌──▼─────────┐ ┌──▼────────┐
│  Core Engine │ │ Node Mgr  │ │  Stats   │
└───────┬──────┘ └────┬──────┘ └──────────┘
        │               │
┌───────▼──────┐ ┌─────▼──────┐
│   Storage    │ │ Heartbeat │
└──────────────┘ └────────────┘
应用层
HTTP 生产者 HTTP 消费者 Web 管理界面
API 层
Gin 路由 中间件 请求验证 响应包装
核心引擎
队列管理 消息管理 生产者管理 消费者管理 统计收集
节点管理
节点注册 心跳检测 节点发现 负载均衡
存储层
内存存储 BadgerDB 持久化 快照恢复

核心特性

📨 消息生产消费

基于 HTTP API 的消息生产与消费,支持多队列、动态创建队列,消息自动分配唯一 ID,支持消息确认机制。

💾 数据持久化

支持内存和 BadgerDB 持久化存储,断电后消息不丢失,重启服务后消息自动恢复,确保消息可靠性。

🌐 分布式节点

支持多节点部署,节点自动发现与注册,心跳检测感知节点状态,负载均衡在多个节点间分发请求。

🖥️ Web 管理界面

完整的 Web 管理界面,支持仪表盘监控、队列管理、节点管理、消息追踪、配置管理、用户认证。

🔒 消息确认机制

消息消费后需要显式确认,超时未确认的消息自动重新入队,保证消息至少被消费一次。

📊 实时监控统计

内置统计信息收集,实时监控生产者、消费者数量,消息总数,队列积压情况,请求延迟等指标。

🔐 安全认证

支持 API 访问认证、权限控制、用户密码管理、敏感数据加密存储,防止恶意请求攻击。

⚡ 高并发处理

基于 Go 协程的高并发处理能力,支持 10000+ QPS,sync.Map 并发安全控制,锁机制保证数据一致性。

📝 结构化日志

多级别日志(DEBUG/INFO/WARN/ERROR/FATAL),文本/JSON 格式输出,支持控制台/文件双输出。

🔄 优雅关闭

服务优雅关闭机制,处理未完成请求,释放资源,确保服务平稳停止,避免数据丢失。

⚙️ 灵活配置

支持 JSON/YAML 配置文件,环境变量覆盖,默认配置设置,配置热加载基础框架,适配不同部署环境。

🔧 可扩展设计

存储层抽象接口可扩展更多后端,API 层可扩展支持 AMQP/MQTT 协议,节点层可扩展更多发现机制。

API 文档

消息接口

接口方法说明
/api/v1/producePOST发送消息到指定队列
/api/v1/consumeGET从队列获取消息
/api/v1/ackPOST确认消息已消费

管理接口

接口方法说明
/api/v1/queuesGET获取队列列表
/api/v1/queuesPOST创建新队列
/api/v1/nodesGET获取节点列表
/api/v1/nodes/:idDELETE删除节点
/api/v1/statsGET获取统计信息

生产消息示例

# 发送消息到指定队列 curl -X POST http://localhost:9700/api/v1/produce \ -H "Content-Type: application/json" \ -d '{ "queue": "test-queue", "message": "Hello, EasyMQ!" }' # 响应示例 { "code": 200, "success": true, "data": { "message_id": "msg-abc123", "queue": "test-queue", "status": "pending" } }

消费消息示例

# 从队列获取消息 curl "http://localhost:9700/api/v1/consume?queue=test-queue" # 确认消息已消费 curl -X POST http://localhost:9700/api/v1/ack \ -H "Content-Type: application/json" \ -d '{ "queue": "test-queue", "message_id": "msg-abc123" }'

快速开始

1. 配置文件

# config.ini [server] host = 0.0.0.0 port = 9700 [log] level = info format = text output = stdout [storage] type = memory # type = badger [node] heartbeat_interval = 5s heartbeat_timeout = 15s

2. 启动服务

# 直接启动(默认端口 9700) go run main.go # 指定端口启动 go run main.go -port 9700

3. 访问 Web 界面

# 打开浏览器访问 http://localhost:9700/ # 功能 - 仪表盘:查看生产者、消费者、消息总数 - 队列管理:创建、删除队列 - 节点管理:查看节点状态,删除节点 - 消息追踪:查看消息内容、状态

4. 完整流程示例

# 1. 创建队列 curl -X POST http://localhost:9700/api/v1/queues \ -H "Content-Type: application/json" \ -d '{"name": "order-queue"}' # 2. 生产消息 curl -X POST http://localhost:9700/api/v1/produce \ -H "Content-Type: application/json" \ -d '{ "queue": "order-queue", "message": "{\"order_id\": 1001, \"action\": \"create\"}" }' # 3. 消费消息 curl "http://localhost:9700/api/v1/consume?queue=order-queue" # 4. 确认消息 curl -X POST http://localhost:9700/api/v1/ack \ -H "Content-Type: application/json" \ -d '{"queue": "order-queue", "message_id": "msg-xxx"}' # 5. 查看统计 curl http://localhost:9700/api/v1/stats

配置说明

配置项类型默认值说明
server.hoststring0.0.0.0监听地址
server.portint9700监听端口
log.levelstringinfo日志级别
log.formatstringtext日志格式(text/json)
log.outputstringstdout日志输出方式
storage.typestringmemory存储类型(memory/badger)
node.heartbeat_intervalduration5s心跳间隔
node.heartbeat_timeoutduration15s心跳超时时间

立即下载

选择合适的版本开始使用 EasyMQ

📦 下载完整包

版本: v1.0.0 | 平台: Windows/Linux amd64

包含: easymq.exe (服务器) + config.ini (配置文件) + Web 管理界面