Import x-panel source
This commit is contained in:
714
web/job/check_client_ip_job.go
Normal file
714
web/job/check_client_ip_job.go
Normal file
@@ -0,0 +1,714 @@
|
||||
package job
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"encoding/json"
|
||||
"io"
|
||||
"log"
|
||||
"os"
|
||||
"os/exec"
|
||||
"regexp"
|
||||
"runtime"
|
||||
"sort"
|
||||
"time"
|
||||
"sync"
|
||||
"crypto/rand"
|
||||
"encoding/hex"
|
||||
"fmt" // 中文注释 (新增): 导入 fmt 包用于格式化消息
|
||||
|
||||
"x-ui/database"
|
||||
"x-ui/database/model"
|
||||
"x-ui/logger"
|
||||
"x-ui/xray"
|
||||
"x-ui/web/service"
|
||||
)
|
||||
|
||||
// =================================================================
|
||||
// 中文注释: 以下是用于实现设备限制功能的核心代码
|
||||
// =================================================================
|
||||
|
||||
// ActiveClientIPs 中文注释: 用于在内存中跟踪每个用户的活跃IP (TTL机制)
|
||||
// 结构: map[用户email] -> map[IP地址] -> 最后活跃时间
|
||||
var ActiveClientIPs = make(map[string]map[string]time.Time)
|
||||
var activeClientsLock sync.RWMutex
|
||||
|
||||
// ClientStatus 中文注释: 用于跟踪每个用户的状态(是否因为设备超限而被禁用)
|
||||
// 结构: map[用户email] -> 是否被禁用(true/false)
|
||||
var ClientStatus = make(map[string]bool)
|
||||
var clientStatusLock sync.RWMutex
|
||||
|
||||
// CheckDeviceLimitJob 中文注释: 这是我们的设备限制任务的结构体
|
||||
type CheckDeviceLimitJob struct {
|
||||
inboundService service.InboundService
|
||||
xrayService *service.XrayService
|
||||
// 中文注释: 新增 xrayApi 字段,用于持有 Xray API 客户端实例
|
||||
xrayApi xray.XrayAPI
|
||||
// lastPosition 中文注释: 用于记录上次读取 access.log 的位置,避免重复读取
|
||||
lastPosition int64
|
||||
// 〔中文注释〕: 注入 Telegram 服务用于发送通知,确保此行存在。
|
||||
telegramService service.TelegramService
|
||||
|
||||
// violationStartTime: 记录用户“开始设备超限”的时间。
|
||||
// 用于实现“观察期”:刚发现超限时不封,等 3 分钟后如果还超限才封。
|
||||
violationStartTime map[string]time.Time
|
||||
|
||||
// triggerLock: 保护上述 Map 的读写安全
|
||||
triggerLock sync.Mutex
|
||||
}
|
||||
|
||||
// RandomUUID 中文注释: 新增一个辅助函数,用于生成一个随机的 UUID
|
||||
func RandomUUID() string {
|
||||
uuid := make([]byte, 16)
|
||||
rand.Read(uuid)
|
||||
uuid[6] = (uuid[6] & 0x0f) | 0x40
|
||||
uuid[8] = (uuid[8] & 0x3f) | 0x80
|
||||
return hex.EncodeToString(uuid[0:4]) + "-" + hex.EncodeToString(uuid[4:6]) + "-" + hex.EncodeToString(uuid[6:8]) + "-" + hex.EncodeToString(uuid[8:10]) + "-" + hex.EncodeToString(uuid[10:16])
|
||||
}
|
||||
|
||||
// NewCheckDeviceLimitJob 中文注释: 创建一个新的任务实例
|
||||
// 〔中文注释〕:增加一个 service.TelegramService 类型的参数。
|
||||
func NewCheckDeviceLimitJob(xrayService *service.XrayService, telegramService service.TelegramService) *CheckDeviceLimitJob {
|
||||
return &CheckDeviceLimitJob{
|
||||
xrayService: xrayService,
|
||||
// 中文注释: 初始化 xrayApi 字段
|
||||
xrayApi: xray.XrayAPI{},
|
||||
// 〔中文注释〕: 将传入的 telegramService 赋值给结构体实例。
|
||||
telegramService: telegramService,
|
||||
|
||||
// 初始化防抖 Map
|
||||
violationStartTime: make(map[string]time.Time),
|
||||
}
|
||||
}
|
||||
|
||||
// Run 中文注释: 定时任务的主函数,每次定时器触发时执行
|
||||
func (j *CheckDeviceLimitJob) Run() {
|
||||
// 中文注释: 检查 xray 是否正在运行,如果xray没运行,则无需执行此任务
|
||||
if !j.xrayService.IsXrayRunning() {
|
||||
return
|
||||
}
|
||||
|
||||
// 1. 清理过期的IP
|
||||
j.cleanupExpiredIPs()
|
||||
|
||||
// 2. 解析新的日志并更新IP列表
|
||||
j.parseAccessLog()
|
||||
|
||||
// 3. 检查所有用户的设备限制状态
|
||||
j.checkAllClientsLimit()
|
||||
}
|
||||
|
||||
// cleanupExpiredIPs 中文注释: 清理长时间不活跃的IP
|
||||
func (j *CheckDeviceLimitJob) cleanupExpiredIPs() {
|
||||
activeClientsLock.Lock()
|
||||
defer activeClientsLock.Unlock()
|
||||
|
||||
now := time.Now()
|
||||
// 中文注释: 活跃判断窗口(TTL): 近3分钟内出现过就算“活跃”
|
||||
const activeTTL = 3 * time.Minute
|
||||
for email, ips := range ActiveClientIPs {
|
||||
for ip, lastSeen := range ips {
|
||||
// 中文注释: 如果一个IP超过3分钟没有新的连接日志,我们就认为它已经下线
|
||||
if now.Sub(lastSeen) > activeTTL {
|
||||
delete(ActiveClientIPs[email], ip)
|
||||
}
|
||||
}
|
||||
// 中文注释: 如果一个用户的所有IP都下线了,就从大Map中移除这个用户,节省内存
|
||||
if len(ActiveClientIPs[email]) == 0 {
|
||||
delete(ActiveClientIPs, email)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// parseAccessLog 中文注释: 解析 xray access log 来获取最新的用户IP信息
|
||||
func (j *CheckDeviceLimitJob) parseAccessLog() {
|
||||
logPath, err := xray.GetAccessLogPath()
|
||||
if err != nil || logPath == "none" || logPath == "" {
|
||||
return
|
||||
}
|
||||
|
||||
file, err := os.Open(logPath)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
defer file.Close()
|
||||
|
||||
// 中文注释: 移动到上次读取结束的位置,实现增量读取
|
||||
file.Seek(j.lastPosition, 0)
|
||||
|
||||
scanner := bufio.NewScanner(file)
|
||||
|
||||
// 中文注释: 使用正则表达式从日志行中提取 email 和 IP
|
||||
emailRegex := regexp.MustCompile(`email: ([^ ]+)`)
|
||||
ipRegex := regexp.MustCompile(`from (?:tcp:|udp:)?\[?([0-9a-fA-F\.:]+)\]?:\d+ accepted`)
|
||||
|
||||
activeClientsLock.Lock()
|
||||
defer activeClientsLock.Unlock()
|
||||
|
||||
now := time.Now()
|
||||
for scanner.Scan() {
|
||||
line := scanner.Text()
|
||||
|
||||
emailMatch := emailRegex.FindStringSubmatch(line)
|
||||
ipMatch := ipRegex.FindStringSubmatch(line)
|
||||
|
||||
if len(emailMatch) > 1 && len(ipMatch) > 1 {
|
||||
email := emailMatch[1]
|
||||
ip := ipMatch[1]
|
||||
|
||||
if ip == "127.0.0.1" || ip == "::1" {
|
||||
continue
|
||||
}
|
||||
|
||||
if _, ok := ActiveClientIPs[email]; !ok {
|
||||
ActiveClientIPs[email] = make(map[string]time.Time)
|
||||
}
|
||||
ActiveClientIPs[email][ip] = now
|
||||
}
|
||||
}
|
||||
|
||||
currentPosition, err := file.Seek(0, os.SEEK_END)
|
||||
if err == nil {
|
||||
if currentPosition < j.lastPosition {
|
||||
j.lastPosition = 0
|
||||
} else {
|
||||
j.lastPosition = currentPosition
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// checkAllClientsLimit 中文注释: 核心功能,检查所有用户,对超限的执行封禁,对恢复的执行解封
|
||||
func (j *CheckDeviceLimitJob) checkAllClientsLimit() {
|
||||
db := database.GetDB()
|
||||
var inbounds []*model.Inbound
|
||||
// 中文注释: 这里仅查询启用了设备限制(device_limit > 0)并且自身是开启状态的入站规则
|
||||
db.Where("device_limit > 0 AND enable = ?", true).Find(&inbounds)
|
||||
|
||||
if len(inbounds) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
// 中文注释: 获取 API 端口。如果端口为0 (说明Xray未完全启动或有问题),则直接返回
|
||||
apiPort := j.xrayService.GetApiPort()
|
||||
if apiPort == 0 {
|
||||
return
|
||||
}
|
||||
// 中文注释: 使用获取到的端口号初始化 API 客户端
|
||||
j.xrayApi.Init(apiPort)
|
||||
defer j.xrayApi.Close()
|
||||
|
||||
// 中文注释: 优化 - 在一次循环中同时获取 tag 和 protocol
|
||||
inboundInfoMap := make(map[int]struct {
|
||||
Limit int
|
||||
Tag string
|
||||
Protocol model.Protocol
|
||||
})
|
||||
for _, inbound := range inbounds {
|
||||
inboundInfoMap[inbound.Id] = struct {
|
||||
Limit int
|
||||
Tag string
|
||||
Protocol model.Protocol
|
||||
}{Limit: inbound.DeviceLimit, Tag: inbound.Tag, Protocol: inbound.Protocol}
|
||||
}
|
||||
|
||||
activeClientsLock.RLock()
|
||||
clientStatusLock.Lock()
|
||||
defer activeClientsLock.RUnlock()
|
||||
defer clientStatusLock.Unlock()
|
||||
|
||||
// 第一步: 处理当前在线的用户
|
||||
for email, ips := range ActiveClientIPs {
|
||||
traffic, err := j.inboundService.GetClientTrafficByEmail(email)
|
||||
if err != nil || traffic == nil {
|
||||
continue
|
||||
}
|
||||
|
||||
info, ok := inboundInfoMap[traffic.InboundId]
|
||||
if !ok || info.Limit <= 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
isBanned := ClientStatus[email]
|
||||
activeIPCount := len(ips)
|
||||
|
||||
// =====================================================================
|
||||
// 设备限制的“观察期”逻辑 (完美解决切换网络误封问题)
|
||||
// =====================================================================
|
||||
|
||||
// 场景 A:用户设备数超限,且当前未被封禁
|
||||
if activeIPCount > info.Limit && !isBanned {
|
||||
j.triggerLock.Lock()
|
||||
startTime, exists := j.violationStartTime[email]
|
||||
|
||||
if !exists {
|
||||
// 如果是第一次发现超限,不要急着封!记录当前时间,开始“观察”
|
||||
j.violationStartTime[email] = time.Now()
|
||||
logger.Infof("〔观察期〕检测到用户 %s 设备超限 (%d > %d),进入3分钟观察期,暂不封禁...", email, activeIPCount, info.Limit)
|
||||
j.triggerLock.Unlock()
|
||||
continue // 跳过本次循环,给用户一点时间(例如切换网络造成的双IP)
|
||||
}
|
||||
|
||||
// 如果已经处于观察期,计算已持续了多久
|
||||
// 【核心设置】:这里设置为 3 分钟 (180秒)。
|
||||
if time.Since(startTime) < 3*time.Minute {
|
||||
j.triggerLock.Unlock()
|
||||
// 还在观察期内,暂不封禁
|
||||
continue
|
||||
}
|
||||
|
||||
// 观察期结束,超限状态依然存在 -> 确认封禁!
|
||||
// 封禁前先清除观察记录,以便下次使用
|
||||
delete(j.violationStartTime, email)
|
||||
j.triggerLock.Unlock()
|
||||
|
||||
// 执行原有的封禁逻辑
|
||||
j.banUser(email, activeIPCount, &info)
|
||||
}
|
||||
|
||||
// 场景 B:用户恢复正常 (IP数 <= 限制),或者已被封禁但现在设备数合规
|
||||
if activeIPCount <= info.Limit {
|
||||
// 如果该用户之前在“观察名单”里,现在正常了,直接移除名单,皆大欢喜
|
||||
j.triggerLock.Lock()
|
||||
if _, exists := j.violationStartTime[email]; exists {
|
||||
delete(j.violationStartTime, email)
|
||||
logger.Infof("〔观察期〕用户 %s 设备数量已恢复正常,观察期取消。", email)
|
||||
}
|
||||
j.triggerLock.Unlock()
|
||||
|
||||
// 如果用户处于被封禁状态,执行解封
|
||||
if isBanned {
|
||||
j.unbanUser(email, activeIPCount, &info)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// 第二步: 专门处理那些“已被封禁”但“已不在线”的用户,为他们解封
|
||||
for email, isBanned := range ClientStatus {
|
||||
if !isBanned {
|
||||
continue
|
||||
}
|
||||
if _, online := ActiveClientIPs[email]; !online {
|
||||
traffic, err := j.inboundService.GetClientTrafficByEmail(email)
|
||||
if err != nil || traffic == nil {
|
||||
continue
|
||||
}
|
||||
info, ok := inboundInfoMap[traffic.InboundId]
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
logger.Infof("已封禁用户 %s 已完全下线,执行解封操作。", email)
|
||||
|
||||
// 调用解封函数,这种情况下:活跃IP数为0,我们直接传入0用于记录日志
|
||||
j.unbanUser(email, 0, &info)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// banUser 中文注释: 封装的封禁用户函数;IP数量超限,且用户当前未被封禁 -> 执行封禁 (UUID 替换)
|
||||
func (j *CheckDeviceLimitJob) banUser(email string, activeIPCount int, info *struct {
|
||||
Limit int
|
||||
Tag string
|
||||
Protocol model.Protocol
|
||||
}) {
|
||||
// =================================================================
|
||||
// 这一行代码是整个解封逻辑的灵魂!
|
||||
// GetClientByEmail 函数会去查询您的数据库 (x-ui.db),
|
||||
// 找到 `inbounds` 表,解析其中的 `settings` 字段,并从中去,
|
||||
// 读取出您最初设置的、最原始、最正确的用户信息(包括最原始的UUID),
|
||||
// 然后把它赋值给 `client` 这个变量;此时,`client` 变量就持有了那个“老链接”的正确原始 UUID。
|
||||
// =================================================================
|
||||
_, client, err := j.inboundService.GetClientByEmail(email)
|
||||
if err != nil || client == nil {
|
||||
return
|
||||
}
|
||||
logger.Infof("〔设备限制〕超限:用户 %s. 限制: %d, 当前活跃: %d. 执行封禁掐网。", email, info.Limit, activeIPCount)
|
||||
|
||||
// 〔中文注释〕: 以下是发送 Telegram 通知的核心代码,
|
||||
// 它会调用我们注入的 telegramService 的 SendMessage 方法。
|
||||
go func() {
|
||||
// 〔中文注释〕: 在调用前,先判断服务实例是否为 nil,增加代码健壮性。
|
||||
if j.telegramService == nil {
|
||||
return
|
||||
}
|
||||
tgMessage := fmt.Sprintf(
|
||||
"<b>〔X-Panel面板〕设备超限提醒</b>\n\n"+
|
||||
" ------------------------------------\n"+
|
||||
" 👤 用户 Email:%s\n"+
|
||||
" 🖥️ 设备限制数量:%d\n"+
|
||||
" 🌐 当前在线IP数:%d\n"+
|
||||
" ------------------------------------\n\n"+
|
||||
"<b><i>⚠ 该用户已被自动掐网封禁!</i></b>",
|
||||
email, info.Limit, activeIPCount,
|
||||
)
|
||||
// 〔中文注释〕: 调用接口方法发送消息。
|
||||
err := j.telegramService.SendMessage(tgMessage)
|
||||
if err != nil {
|
||||
logger.Warningf("发送 Telegram 封禁通知失败: %v", err)
|
||||
}
|
||||
}()
|
||||
|
||||
|
||||
// 中文注释: 步骤一:先从 Xray-Core 中删除该用户。
|
||||
j.xrayApi.RemoveUser(info.Tag, email)
|
||||
|
||||
// =================================================================
|
||||
// 中文注释: 增加 5000 毫秒延时,解决竞态条件问题
|
||||
time.Sleep(5000 * time.Millisecond)
|
||||
// =================================================================
|
||||
|
||||
// 中文注释: 创建一个带有随机UUID/Password的临时客户端配置用于“封禁”
|
||||
tempClient := *client
|
||||
|
||||
// 适用于 VMess/VLESS
|
||||
if tempClient.ID != "" { tempClient.ID = RandomUUID() }
|
||||
|
||||
// 适用于 Trojan/Shadowsocks/Socks
|
||||
if tempClient.Password != "" { tempClient.Password = RandomUUID() }
|
||||
|
||||
var clientMap map[string]interface{}
|
||||
clientJson, _ := json.Marshal(tempClient)
|
||||
json.Unmarshal(clientJson, &clientMap)
|
||||
|
||||
// 中文注释: 步骤二:将这个带有错误UUID/Password的临时用户添加回去。
|
||||
// 客户端持有的还是旧的UUID,自然就无法通过验证,从而达到了“封禁”的效果。
|
||||
err = j.xrayApi.AddUser(string(info.Protocol), info.Tag, clientMap)
|
||||
if err != nil {
|
||||
logger.Warningf("通过API封禁用户 %s 失败: %v", email, err)
|
||||
} else {
|
||||
// 中文注释: 封禁成功后,在内存中标记该用户为“已封禁”状态。
|
||||
ClientStatus[email] = true
|
||||
}
|
||||
}
|
||||
|
||||
// unbanUser 中文注释: 封装的解封用户函数;IP数量已恢复正常,但用户处于封禁状态 -> 执行解封 (恢复原始 UUID)
|
||||
func (j *CheckDeviceLimitJob) unbanUser(email string, activeIPCount int, info *struct {
|
||||
Limit int
|
||||
Tag string
|
||||
Protocol model.Protocol
|
||||
}) {
|
||||
_, client, err := j.inboundService.GetClientByEmail(email)
|
||||
if err != nil || client == nil {
|
||||
return
|
||||
}
|
||||
logger.Infof("〔设备数量〕已恢复:用户 %s. 限制: %d, 当前活跃: %d. 执行解封/恢复用户。", email, info.Limit, activeIPCount)
|
||||
|
||||
// 中文注释: 步骤一:先从 Xray-Core 中删除用于“封禁”的那个临时用户。
|
||||
j.xrayApi.RemoveUser(info.Tag, email)
|
||||
|
||||
// =================================================================
|
||||
// 中文注释: 同样增加 5000 毫秒延时,确保解封操作的稳定性
|
||||
time.Sleep(5000 * time.Millisecond)
|
||||
// =================================================================
|
||||
|
||||
var clientMap map[string]interface{}
|
||||
clientJson, _ := json.Marshal(client)
|
||||
json.Unmarshal(clientJson, &clientMap)
|
||||
|
||||
// 中文注释: 步骤二:将数据库中原始的、正确的用户信息重新添加回 Xray-Core,从而实现“解封”。
|
||||
err = j.xrayApi.AddUser(string(info.Protocol), info.Tag, clientMap)
|
||||
if err != nil {
|
||||
logger.Warningf("通过API恢复用户 %s 失败: %v", email, err)
|
||||
} else {
|
||||
// 中文注释: 解封成功后,从内存中移除该用户的“已封禁”状态标记。
|
||||
delete(ClientStatus, email)
|
||||
}
|
||||
}
|
||||
|
||||
type CheckClientIpJob struct {
|
||||
lastClear int64
|
||||
disAllowedIps []string
|
||||
}
|
||||
|
||||
var job *CheckClientIpJob
|
||||
|
||||
func NewCheckClientIpJob() *CheckClientIpJob {
|
||||
job = new(CheckClientIpJob)
|
||||
return job
|
||||
}
|
||||
|
||||
func (j *CheckClientIpJob) Run() {
|
||||
if j.lastClear == 0 {
|
||||
j.lastClear = time.Now().Unix()
|
||||
}
|
||||
|
||||
shouldClearAccessLog := false
|
||||
iplimitActive := j.hasLimitIp()
|
||||
f2bInstalled := j.checkFail2BanInstalled()
|
||||
isAccessLogAvailable := j.checkAccessLogAvailable(iplimitActive)
|
||||
|
||||
if isAccessLogAvailable {
|
||||
if runtime.GOOS == "windows" {
|
||||
if iplimitActive {
|
||||
shouldClearAccessLog = j.processLogFile()
|
||||
}
|
||||
} else {
|
||||
if iplimitActive {
|
||||
if f2bInstalled {
|
||||
shouldClearAccessLog = j.processLogFile()
|
||||
} else {
|
||||
if !f2bInstalled {
|
||||
logger.Warning("[LimitIP] Fail2Ban is not installed, Please install Fail2Ban from the x-ui bash menu.")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if shouldClearAccessLog || (isAccessLogAvailable && time.Now().Unix()-j.lastClear > 3600) {
|
||||
j.clearAccessLog()
|
||||
}
|
||||
}
|
||||
|
||||
func (j *CheckClientIpJob) clearAccessLog() {
|
||||
logAccessP, err := os.OpenFile(xray.GetAccessPersistentLogPath(), os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0o644)
|
||||
j.checkError(err)
|
||||
defer logAccessP.Close()
|
||||
|
||||
accessLogPath, err := xray.GetAccessLogPath()
|
||||
j.checkError(err)
|
||||
|
||||
file, err := os.Open(accessLogPath)
|
||||
j.checkError(err)
|
||||
defer file.Close()
|
||||
|
||||
_, err = io.Copy(logAccessP, file)
|
||||
j.checkError(err)
|
||||
|
||||
err = os.Truncate(accessLogPath, 0)
|
||||
j.checkError(err)
|
||||
|
||||
j.lastClear = time.Now().Unix()
|
||||
}
|
||||
|
||||
func (j *CheckClientIpJob) hasLimitIp() bool {
|
||||
db := database.GetDB()
|
||||
var inbounds []*model.Inbound
|
||||
|
||||
err := db.Model(model.Inbound{}).Find(&inbounds).Error
|
||||
if err != nil {
|
||||
return false
|
||||
}
|
||||
|
||||
for _, inbound := range inbounds {
|
||||
if inbound.Settings == "" {
|
||||
continue
|
||||
}
|
||||
|
||||
settings := map[string][]model.Client{}
|
||||
json.Unmarshal([]byte(inbound.Settings), &settings)
|
||||
clients := settings["clients"]
|
||||
|
||||
for _, client := range clients {
|
||||
limitIp := client.LimitIP
|
||||
if limitIp > 0 {
|
||||
return true
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
func (j *CheckClientIpJob) processLogFile() bool {
|
||||
|
||||
ipRegex := regexp.MustCompile(`from (?:tcp:|udp:)?\[?([0-9a-fA-F\.:]+)\]?:\d+ accepted`)
|
||||
emailRegex := regexp.MustCompile(`email: (.+)$`)
|
||||
|
||||
accessLogPath, _ := xray.GetAccessLogPath()
|
||||
file, _ := os.Open(accessLogPath)
|
||||
defer file.Close()
|
||||
|
||||
inboundClientIps := make(map[string]map[string]struct{}, 100)
|
||||
|
||||
scanner := bufio.NewScanner(file)
|
||||
for scanner.Scan() {
|
||||
line := scanner.Text()
|
||||
|
||||
ipMatches := ipRegex.FindStringSubmatch(line)
|
||||
if len(ipMatches) < 2 {
|
||||
continue
|
||||
}
|
||||
|
||||
ip := ipMatches[1]
|
||||
|
||||
if ip == "127.0.0.1" || ip == "::1" {
|
||||
continue
|
||||
}
|
||||
|
||||
emailMatches := emailRegex.FindStringSubmatch(line)
|
||||
if len(emailMatches) < 2 {
|
||||
continue
|
||||
}
|
||||
email := emailMatches[1]
|
||||
|
||||
if _, exists := inboundClientIps[email]; !exists {
|
||||
inboundClientIps[email] = make(map[string]struct{})
|
||||
}
|
||||
inboundClientIps[email][ip] = struct{}{}
|
||||
}
|
||||
|
||||
shouldCleanLog := false
|
||||
for email, uniqueIps := range inboundClientIps {
|
||||
|
||||
ips := make([]string, 0, len(uniqueIps))
|
||||
for ip := range uniqueIps {
|
||||
ips = append(ips, ip)
|
||||
}
|
||||
sort.Strings(ips)
|
||||
|
||||
clientIpsRecord, err := j.getInboundClientIps(email)
|
||||
if err != nil {
|
||||
j.addInboundClientIps(email, ips)
|
||||
continue
|
||||
}
|
||||
|
||||
shouldCleanLog = j.updateInboundClientIps(clientIpsRecord, email, ips) || shouldCleanLog
|
||||
}
|
||||
|
||||
return shouldCleanLog
|
||||
}
|
||||
|
||||
func (j *CheckClientIpJob) checkFail2BanInstalled() bool {
|
||||
cmd := "fail2ban-client"
|
||||
args := []string{"-h"}
|
||||
err := exec.Command(cmd, args...).Run()
|
||||
return err == nil
|
||||
}
|
||||
|
||||
func (j *CheckClientIpJob) checkAccessLogAvailable(iplimitActive bool) bool {
|
||||
accessLogPath, err := xray.GetAccessLogPath()
|
||||
if err != nil {
|
||||
return false
|
||||
}
|
||||
|
||||
if accessLogPath == "none" || accessLogPath == "" {
|
||||
if iplimitActive {
|
||||
logger.Warning("[LimitIP] Access log path is not set, Please configure the access log path in Xray configs.")
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
func (j *CheckClientIpJob) checkError(e error) {
|
||||
if e != nil {
|
||||
logger.Warning("client ip job err:", e)
|
||||
}
|
||||
}
|
||||
|
||||
func (j *CheckClientIpJob) getInboundClientIps(clientEmail string) (*model.InboundClientIps, error) {
|
||||
db := database.GetDB()
|
||||
InboundClientIps := &model.InboundClientIps{}
|
||||
err := db.Model(model.InboundClientIps{}).Where("client_email = ?", clientEmail).First(InboundClientIps).Error
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return InboundClientIps, nil
|
||||
}
|
||||
|
||||
func (j *CheckClientIpJob) addInboundClientIps(clientEmail string, ips []string) error {
|
||||
inboundClientIps := &model.InboundClientIps{}
|
||||
jsonIps, err := json.Marshal(ips)
|
||||
j.checkError(err)
|
||||
|
||||
inboundClientIps.ClientEmail = clientEmail
|
||||
inboundClientIps.Ips = string(jsonIps)
|
||||
|
||||
db := database.GetDB()
|
||||
tx := db.Begin()
|
||||
|
||||
defer func() {
|
||||
if err == nil {
|
||||
tx.Commit()
|
||||
} else {
|
||||
tx.Rollback()
|
||||
}
|
||||
}()
|
||||
|
||||
err = tx.Save(inboundClientIps).Error
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (j *CheckClientIpJob) updateInboundClientIps(inboundClientIps *model.InboundClientIps, clientEmail string, ips []string) bool {
|
||||
jsonIps, err := json.Marshal(ips)
|
||||
if err != nil {
|
||||
logger.Error("failed to marshal IPs to JSON:", err)
|
||||
return false
|
||||
}
|
||||
|
||||
inboundClientIps.ClientEmail = clientEmail
|
||||
inboundClientIps.Ips = string(jsonIps)
|
||||
|
||||
inbound, err := j.getInboundByEmail(clientEmail)
|
||||
if err != nil {
|
||||
logger.Errorf("failed to fetch inbound settings for email %s: %s", clientEmail, err)
|
||||
return false
|
||||
}
|
||||
|
||||
if inbound.Settings == "" {
|
||||
logger.Debug("wrong data:", inbound)
|
||||
return false
|
||||
}
|
||||
|
||||
settings := map[string][]model.Client{}
|
||||
json.Unmarshal([]byte(inbound.Settings), &settings)
|
||||
clients := settings["clients"]
|
||||
shouldCleanLog := false
|
||||
j.disAllowedIps = []string{}
|
||||
|
||||
logIpFile, err := os.OpenFile(xray.GetIPLimitLogPath(), os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0644)
|
||||
if err != nil {
|
||||
logger.Errorf("failed to open IP limit log file: %s", err)
|
||||
return false
|
||||
}
|
||||
defer logIpFile.Close()
|
||||
log.SetOutput(logIpFile)
|
||||
log.SetFlags(log.LstdFlags)
|
||||
|
||||
for _, client := range clients {
|
||||
if client.Email == clientEmail {
|
||||
limitIp := client.LimitIP
|
||||
|
||||
if limitIp > 0 && inbound.Enable {
|
||||
shouldCleanLog = true
|
||||
|
||||
if limitIp < len(ips) {
|
||||
j.disAllowedIps = append(j.disAllowedIps, ips[limitIp:]...)
|
||||
for i := limitIp; i < len(ips); i++ {
|
||||
log.Printf("[LIMIT_IP] Email = %s || SRC = %s", clientEmail, ips[i])
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
sort.Strings(j.disAllowedIps)
|
||||
|
||||
if len(j.disAllowedIps) > 0 {
|
||||
logger.Debug("disAllowedIps:", j.disAllowedIps)
|
||||
}
|
||||
|
||||
db := database.GetDB()
|
||||
err = db.Save(inboundClientIps).Error
|
||||
if err != nil {
|
||||
logger.Error("failed to save inboundClientIps:", err)
|
||||
return false
|
||||
}
|
||||
|
||||
return shouldCleanLog
|
||||
}
|
||||
|
||||
func (j *CheckClientIpJob) getInboundByEmail(clientEmail string) (*model.Inbound, error) {
|
||||
db := database.GetDB()
|
||||
inbound := &model.Inbound{}
|
||||
|
||||
err := db.Model(&model.Inbound{}).Where("settings LIKE ?", "%"+clientEmail+"%").First(inbound).Error
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return inbound, nil
|
||||
}
|
||||
54
web/job/check_cpu_usage.go
Normal file
54
web/job/check_cpu_usage.go
Normal file
@@ -0,0 +1,54 @@
|
||||
package job
|
||||
|
||||
import (
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"x-ui/web/service"
|
||||
|
||||
"github.com/shirou/gopsutil/v4/cpu"
|
||||
)
|
||||
|
||||
// 连续超阈值告警实现
|
||||
type CheckCpuJob struct {
|
||||
tgbotService service.Tgbot
|
||||
settingService service.SettingService
|
||||
overThresholdCount int // 连续超阈值计数器
|
||||
lastNotifyTime time.Time // 最近一次告警时间
|
||||
}
|
||||
|
||||
func NewCheckCpuJob() *CheckCpuJob {
|
||||
return &CheckCpuJob{}
|
||||
}
|
||||
|
||||
// run 是 Job 接口方法
|
||||
func (j *CheckCpuJob) Run() {
|
||||
threshold, _ := j.settingService.GetTgCpu()
|
||||
notifyInterval := 10 * time.Minute // 两次告警最小间隔,可做成配置项
|
||||
|
||||
percent, err := cpu.Percent(10*time.Second, false) // 10秒采样
|
||||
if err != nil || len(percent) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
now := time.Now()
|
||||
if percent[0] > float64(threshold) {
|
||||
j.overThresholdCount++
|
||||
} else {
|
||||
j.overThresholdCount = 0
|
||||
}
|
||||
|
||||
// 连续3次超阈值,且距离上次告警超过告警间隔
|
||||
if j.overThresholdCount >= 3 && now.Sub(j.lastNotifyTime) > notifyInterval {
|
||||
msg := j.tgbotService.I18nBot(
|
||||
"tgbot.messages.cpuThreshold",
|
||||
"Percent=="+strconv.FormatFloat(percent[0], 'f', 2, 64),
|
||||
"Threshold=="+strconv.Itoa(threshold),
|
||||
"SampleInterval==10s",
|
||||
"NotifyPolicy==连续3次超阈值",
|
||||
)
|
||||
j.tgbotService.SendMsgToTgbotAdmins(msg)
|
||||
j.lastNotifyTime = now
|
||||
j.overThresholdCount = 0
|
||||
}
|
||||
}
|
||||
19
web/job/check_hash_storage.go
Normal file
19
web/job/check_hash_storage.go
Normal file
@@ -0,0 +1,19 @@
|
||||
package job
|
||||
|
||||
import (
|
||||
"x-ui/web/service"
|
||||
)
|
||||
|
||||
type CheckHashStorageJob struct {
|
||||
tgbotService service.Tgbot
|
||||
}
|
||||
|
||||
func NewCheckHashStorageJob() *CheckHashStorageJob {
|
||||
return new(CheckHashStorageJob)
|
||||
}
|
||||
|
||||
// Here Run is an interface method of the Job interface
|
||||
func (j *CheckHashStorageJob) Run() {
|
||||
// Remove expired hashes from storage
|
||||
j.tgbotService.GetHashStorage().RemoveExpiredHashes()
|
||||
}
|
||||
32
web/job/check_xray_running_job.go
Normal file
32
web/job/check_xray_running_job.go
Normal file
@@ -0,0 +1,32 @@
|
||||
package job
|
||||
|
||||
import (
|
||||
"x-ui/logger"
|
||||
"x-ui/web/service"
|
||||
)
|
||||
|
||||
type CheckXrayRunningJob struct {
|
||||
xrayService service.XrayService
|
||||
|
||||
checkTime int
|
||||
}
|
||||
|
||||
func NewCheckXrayRunningJob() *CheckXrayRunningJob {
|
||||
return new(CheckXrayRunningJob)
|
||||
}
|
||||
|
||||
func (j *CheckXrayRunningJob) Run() {
|
||||
if !j.xrayService.DidXrayCrash() {
|
||||
j.checkTime = 0
|
||||
} else {
|
||||
j.checkTime++
|
||||
// only restart if it's down 2 times in a row
|
||||
if j.checkTime > 1 {
|
||||
err := j.xrayService.RestartXray(false)
|
||||
j.checkTime = 0
|
||||
if err != nil {
|
||||
logger.Error("Restart xray failed:", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
76
web/job/clear_logs_job.go
Normal file
76
web/job/clear_logs_job.go
Normal file
@@ -0,0 +1,76 @@
|
||||
package job
|
||||
|
||||
import (
|
||||
"io"
|
||||
"os"
|
||||
"path/filepath"
|
||||
|
||||
"x-ui/logger"
|
||||
"x-ui/xray"
|
||||
)
|
||||
|
||||
type ClearLogsJob struct{}
|
||||
|
||||
func NewClearLogsJob() *ClearLogsJob {
|
||||
return new(ClearLogsJob)
|
||||
}
|
||||
|
||||
// ensureFileExists creates the necessary directories and file if they don't exist
|
||||
func ensureFileExists(path string) error {
|
||||
dir := filepath.Dir(path)
|
||||
if err := os.MkdirAll(dir, 0755); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
file, err := os.OpenFile(path, os.O_CREATE|os.O_RDWR, 0644)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
file.Close()
|
||||
return nil
|
||||
}
|
||||
|
||||
// Here Run is an interface method of the Job interface
|
||||
func (j *ClearLogsJob) Run() {
|
||||
logFiles := []string{xray.GetIPLimitLogPath(), xray.GetIPLimitBannedLogPath(), xray.GetAccessPersistentLogPath()}
|
||||
logFilesPrev := []string{xray.GetIPLimitBannedPrevLogPath(), xray.GetAccessPersistentPrevLogPath()}
|
||||
|
||||
// Ensure all log files and their paths exist
|
||||
for _, path := range append(logFiles, logFilesPrev...) {
|
||||
if err := ensureFileExists(path); err != nil {
|
||||
logger.Warning("Failed to ensure log file exists:", path, "-", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Clear log files and copy to previous logs
|
||||
for i := 0; i < len(logFiles); i++ {
|
||||
if i > 0 {
|
||||
// Copy to previous logs
|
||||
logFilePrev, err := os.OpenFile(logFilesPrev[i-1], os.O_CREATE|os.O_TRUNC|os.O_WRONLY, 0644)
|
||||
if err != nil {
|
||||
logger.Warning("Failed to open previous log file for writing:", logFilesPrev[i-1], "-", err)
|
||||
continue
|
||||
}
|
||||
|
||||
logFile, err := os.OpenFile(logFiles[i], os.O_RDONLY, 0644)
|
||||
if err != nil {
|
||||
logger.Warning("Failed to open current log file for reading:", logFiles[i], "-", err)
|
||||
logFilePrev.Close()
|
||||
continue
|
||||
}
|
||||
|
||||
_, err = io.Copy(logFilePrev, logFile)
|
||||
if err != nil {
|
||||
logger.Warning("Failed to copy log file:", logFiles[i], "to", logFilesPrev[i-1], "-", err)
|
||||
}
|
||||
|
||||
logFile.Close()
|
||||
logFilePrev.Close()
|
||||
}
|
||||
|
||||
err := os.Truncate(logFiles[i], 0)
|
||||
if err != nil {
|
||||
logger.Warning("Failed to truncate log file:", logFiles[i], "-", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
29
web/job/stats_notify_job.go
Normal file
29
web/job/stats_notify_job.go
Normal file
@@ -0,0 +1,29 @@
|
||||
package job
|
||||
|
||||
import (
|
||||
"x-ui/web/service"
|
||||
)
|
||||
|
||||
type LoginStatus byte
|
||||
|
||||
const (
|
||||
LoginSuccess LoginStatus = 1
|
||||
LoginFail LoginStatus = 0
|
||||
)
|
||||
|
||||
type StatsNotifyJob struct {
|
||||
xrayService service.XrayService
|
||||
tgbotService service.Tgbot
|
||||
}
|
||||
|
||||
func NewStatsNotifyJob() *StatsNotifyJob {
|
||||
return new(StatsNotifyJob)
|
||||
}
|
||||
|
||||
// Here run is a interface method of Job interface
|
||||
func (j *StatsNotifyJob) Run() {
|
||||
if !j.xrayService.IsXrayRunning() {
|
||||
return
|
||||
}
|
||||
j.tgbotService.SendReport()
|
||||
}
|
||||
71
web/job/xray_traffic_job.go
Normal file
71
web/job/xray_traffic_job.go
Normal file
@@ -0,0 +1,71 @@
|
||||
package job
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"x-ui/logger"
|
||||
"x-ui/web/service"
|
||||
"x-ui/xray"
|
||||
|
||||
"github.com/valyala/fasthttp"
|
||||
)
|
||||
|
||||
type XrayTrafficJob struct {
|
||||
settingService service.SettingService
|
||||
xrayService service.XrayService
|
||||
inboundService service.InboundService
|
||||
outboundService service.OutboundService
|
||||
}
|
||||
|
||||
func NewXrayTrafficJob() *XrayTrafficJob {
|
||||
return new(XrayTrafficJob)
|
||||
}
|
||||
|
||||
func (j *XrayTrafficJob) Run() {
|
||||
if !j.xrayService.IsXrayRunning() {
|
||||
return
|
||||
}
|
||||
traffics, clientTraffics, err := j.xrayService.GetXrayTraffic()
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
err, needRestart0 := j.inboundService.AddTraffic(traffics, clientTraffics)
|
||||
if err != nil {
|
||||
logger.Warning("add inbound traffic failed:", err)
|
||||
}
|
||||
err, needRestart1 := j.outboundService.AddTraffic(traffics, clientTraffics)
|
||||
if err != nil {
|
||||
logger.Warning("add outbound traffic failed:", err)
|
||||
}
|
||||
if ExternalTrafficInformEnable, err := j.settingService.GetExternalTrafficInformEnable(); ExternalTrafficInformEnable {
|
||||
j.informTrafficToExternalAPI(traffics, clientTraffics)
|
||||
} else if err != nil {
|
||||
logger.Warning("get ExternalTrafficInformEnable failed:", err)
|
||||
}
|
||||
if needRestart0 || needRestart1 {
|
||||
j.xrayService.SetToNeedRestart()
|
||||
}
|
||||
}
|
||||
|
||||
func (j *XrayTrafficJob) informTrafficToExternalAPI(inboundTraffics []*xray.Traffic, clientTraffics []*xray.ClientTraffic) {
|
||||
informURL, err := j.settingService.GetExternalTrafficInformURI()
|
||||
if err != nil {
|
||||
logger.Warning("get ExternalTrafficInformURI failed:", err)
|
||||
return
|
||||
}
|
||||
requestBody, err := json.Marshal(map[string]any{"clientTraffics": clientTraffics, "inboundTraffics": inboundTraffics})
|
||||
if err != nil {
|
||||
logger.Warning("parse client/inbound traffic failed:", err)
|
||||
return
|
||||
}
|
||||
request := fasthttp.AcquireRequest()
|
||||
defer fasthttp.ReleaseRequest(request)
|
||||
request.Header.SetMethod("POST")
|
||||
request.Header.SetContentType("application/json; charset=UTF-8")
|
||||
request.SetBody([]byte(requestBody))
|
||||
request.SetRequestURI(informURL)
|
||||
response := fasthttp.AcquireResponse()
|
||||
defer fasthttp.ReleaseResponse(response)
|
||||
if err := fasthttp.Do(request, response); err != nil {
|
||||
logger.Warning("POST ExternalTrafficInformURI failed:", err)
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user