Openbmclapi 节点日志整理

叙述

整理一下 openbmclapi 的项目的日志,将 log 文件导出为 db 文件,用于数据分析和开发API;
本来想导出 CSV 格式,用 Excel 分析数据,但 Excel 表最大支持 1,048,576 行 × 16,384 列;
又想到Excel 有个 Microsoft Access 或许支持大量日志数据,想了一下,好像不适合线上;
不用 Microsoft Access 主要是,博主不是很懂这个,对Mysql、PostgreSQL、SQLite更熟悉些;
经过思考,敲定了,本地用SQLite,线上用PostgreSQL,选 PostgreSQL 最大原因是想尝试其他数据库;
来看看我是怎么用 Python 解决这个问题吧 QWQ ~

第一次尝试

需要处理 176 个 .log 格式的文件,内容很多也复杂,只需提取固定格式的数据,其他都不要,生成 csv 文件.

实现目标:

  • 支持输入目录(包含很多 .log 文件)
  • 自动读取全部 .log 文件
  • 从每行中提取关键字段
  • 输出到 .csv 文件

日志示例:

# 来源于 openbmclapi_2025-05-01_020001_log.log 文件

::ffff:101.85.7.200 - - [01/May/2025:02:00:05 +0000] "GET /download/523fa864bde9ad0153d95cba82c8d155b3294968?name=523fa864bde9ad0153d95cba82c8d155b3294968&s=HM8-RY3WJWPO9_D0UnuVHUoCoEU&e=ma4q2xop HTTP/1.1" 206 11924 "-" "PCL2/2.9.3.50"

提取字段:

ip101.85.7.200
datetime01/May/2025:02:00:05 +0000
methodGET
path/download/523fa864bde9ad0153d95cba82c8d155b3294968
status206
size11924
user_agentPCL2/2.9.3.50

Python 代码一

Python
import os
import re
import csv

# 日志匹配正则
LOG_PATTERN = re.compile(
    r'::ffff:(?P<ip>[\d\.]+) - - \[(?P<datetime>[^\]]+)\] '
    r'"(?P<method>\S+) (?P<path>\S+)(?:\?\S*)? HTTP/[\d\.]+" '
    r'(?P<status>\d+) (?P<size>\d+) "-" "(?P<user_agent>[^"]+)"'
)

def extract_data_from_log(file_path):
    """从单个日志文件中提取符合格式的数据"""
    results = []
    with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:
        for line in f:
            match = LOG_PATTERN.search(line)
            if match:
                results.append(match.groupdict())
    return results


def process_logs(input_dir, output_dir):
    """批量处理日志文件并导出CSV"""
    if not os.path.exists(output_dir):
        os.makedirs(output_dir)

    for file in os.listdir(input_dir):
        if file.endswith(".log"):
            log_path = os.path.join(input_dir, file)
            csv_path = os.path.join(output_dir, file.replace(".log", ".csv"))

            print(f"正在处理 {file} ...")
            data = extract_data_from_log(log_path)

            if data:
                with open(csv_path, 'w', newline='', encoding='utf-8') as csvfile:
                    writer = csv.DictWriter(csvfile, fieldnames=data[0].keys())
                    writer.writeheader()
                    writer.writerows(data)
                print(f"导出成功: {csv_path} ({len(data)} 行)")
            else:
                print(f"未匹配到有效数据: {file}")


if __name__ == "__main__":
    input_dir = input("请输入日志所在文件夹路径:").strip()
    output_dir = input("请输入输出CSV文件夹路径:").strip()
    process_logs(input_dir, output_dir)

Python 代码二

Python
import os
import re
import csv

# 匹配日志格式的正则表达式
LOG_PATTERN = re.compile(
    r'::ffff:(?P<ip>[\d\.]+) - - \[(?P<datetime>[^\]]+)\] '
    r'"(?P<method>\S+) (?P<path>\S+)(?:\?\S*)? HTTP/[\d\.]+" '
    r'(?P<status>\d+) (?P<size>\d+) "-" "(?P<user_agent>[^"]+)"'
)

def extract_data_from_log(file_path):
    """从单个日志文件提取符合格式的数据"""
    results = []
    with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:
        for line in f:
            match = LOG_PATTERN.search(line)
            if match:
                data = match.groupdict()
                data['filename'] = os.path.basename(file_path)  # 记录来源文件名
                results.append(data)
    return results

def process_logs_to_one_csv(input_dir, output_csv):
    """批量处理日志文件并合并导出为一个CSV"""
    all_data = []

    for file in os.listdir(input_dir):
        if file.endswith(".log"):
            log_path = os.path.join(input_dir, file)
            print(f"🔍 正在处理 {file} ...")
            data = extract_data_from_log(log_path)
            if data:
                all_data.extend(data)
            else:
                print(f"未匹配到有效数据: {file}")

    if all_data:
        print(f"共提取 {len(all_data)} 行数据,正在写入 {output_csv} ...")
        with open(output_csv, 'w', newline='', encoding='utf-8') as csvfile:
            fieldnames = ['filename', 'ip', 'datetime', 'method', 'path', 'status', 'size', 'user_agent']
            writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
            writer.writeheader()
            writer.writerows(all_data)
        print(f"导出成功: {output_csv}")
    else:
        print("未提取到任何有效日志数据。")

if __name__ == "__main__":
    input_dir = input("请输入日志文件夹路径:").strip()
    output_csv = input("请输入导出 CSV 文件路径(例如 D:\\all.csv):").strip()
    process_logs_to_one_csv(input_dir, output_csv)

思考

写了两个Python脚本发现,对于小日志文件,Excel 表可以支持,大日志文件且超过Excel 最大值:1,048,576 行就会提示:此数据集对于 Excel 网格而言太大。如果保存此工作簿,将丢失未加载的数据。

2025 11 04 044956

第二次尝试

既然 Excel 最大值不能超过1,048,576 行,那我直接用 SQLite,本地测试暂时用不到PostgreSQL;

SQLite 支持:

  • 最大支持:140 TB
  • 单表行数:约 2^64 行(约18,446,744,073,709,551,616 行)
  • 单行数据大小:最大约 1 GB,最大 2^31-1 字节(编译时调整参数)
  • 编译时参数调整,默认 2000 列,最大 32767 列(编译时调整参数)
  • 索引数量:单表最多 64 个索引

改进目标

  • 批量读取 .log 文件
  • 解析日志行(同样用正则)
  • 写入 SQLite(自动建表、插入)
  • 每条记录附带来源文件名

Python 代码

Python
import os
import re
import sqlite3

# 日志正则
LOG_PATTERN = re.compile(
    r'::ffff:(?P<ip>[\d\.]+) - - \[(?P<datetime>[^\]]+)\] '
    r'"(?P<method>\S+) (?P<path>\S+)(?:\?\S*)? HTTP/[\d\.]+" '
    r'(?P<status>\d+) (?P<size>\d+) "-" "(?P<user_agent>[^"]+)"'
)

def init_db(db_path):
    """创建 SQLite 数据库和表"""
    conn = sqlite3.connect(db_path)
    c = conn.cursor()
    c.execute("""
        CREATE TABLE IF NOT EXISTS logs (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            filename TEXT,
            ip TEXT,
            datetime TEXT,
            method TEXT,
            path TEXT,
            status INTEGER,
            size INTEGER,
            user_agent TEXT
        )
    """)
    conn.commit()
    return conn

def insert_data(conn, data):
    """批量插入日志数据"""
    c = conn.cursor()
    c.executemany("""
        INSERT INTO logs (filename, ip, datetime, method, path, status, size, user_agent)
        VALUES (:filename, :ip, :datetime, :method, :path, :status, :size, :user_agent)
    """, data)
    conn.commit()

def extract_data_from_log(file_path):
    """从单个日志文件提取数据"""
    results = []
    with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:
        for line in f:
            match = LOG_PATTERN.search(line)
            if match:
                record = match.groupdict()
                record["filename"] = os.path.basename(file_path)
                results.append(record)
    return results

def process_logs_to_sqlite(input_dir, db_path):
    """批量处理日志文件写入SQLite"""
    conn = init_db(db_path)
    total = 0

    for file in os.listdir(input_dir):
        if file.endswith(".log"):
            log_path = os.path.join(input_dir, file)
            print(f"正在处理 {file} ...")
            data = extract_data_from_log(log_path)
            if data:
                insert_data(conn, data)
                total += len(data)
                print(f"已导入 {len(data)} 条")
            else:
                print(f"无有效日志行: {file}")

    conn.close()
    print(f"所有日志处理完毕,共导入 {total} 条记录到 {db_path}")

if __name__ == "__main__":
    input_dir = input("请输入日志文件夹路径:").strip()
    db_path = input("请输入输出SQLite数据库路径(例如 D:\\logs.db):").strip()
    process_logs_to_sqlite(input_dir, db_path)

SQLite 表结构

idINTEGER自增主键
filenameTEXTaccess_20250501.log
ipTEXT101.85.7.200
datetimeTEXT01/May/2025:02:00:05 +0000
methodTEXTGET
pathTEXT/download/523fa864
statusINTEGER206
sizeINTEGER11924
user_agentTEXTPCL2/2.9.3.50

第三次尝试

看了代码,想了想,单线程有点慢,改成多线程,生成速度不得起飞咯~
思考片刻,改了一下功能:

多线程并行处理多个 .log 文件,加快速度
断点续传如果某 .log 文件已经导入过(对应表已存在),则自动跳过
单数据库所有 .log 文件存入同一个 SQLite 数据库
每个文件一张表表名根据文件名生成(自动去掉特殊字符)
时间戳[01/May/2025:02:00:05 +0000] 转为 UNIX 时间戳(秒)存储
其他结构保留字段保持:ip, datetime, method, path, status, size, user_agent

SQLite 表结构

idINTEGER主键
ipTEXT客户端 IP
datetimeINTEGERUNIX 时间戳
methodTEXTHTTP 方法
pathTEXT请求路径
statusINTEGER状态码
sizeINTEGER响应大小
user_agentTEXTUA 字符串

Python 代码

Python
import os
import re
import sqlite3
import threading
import time
from datetime import datetime
from concurrent.futures import ThreadPoolExecutor, as_completed

# 日志匹配正则
LOG_PATTERN = re.compile(
    r'::ffff:(?P<ip>[\d\.]+) - - \[(?P<datetime>[^\]]+)\] '
    r'"(?P<method>\S+) (?P<path>\S+)(?:\?\S*)? HTTP/[\d\.]+" '
    r'(?P<status>\d+) (?P<size>\d+) "-" "(?P<user_agent>[^"]+)"'
)

# 月份映射(日志格式是 01/May/2025)
MONTH_MAP = {
    'Jan': 1, 'Feb': 2, 'Mar': 3, 'Apr': 4, 'May': 5, 'Jun': 6,
    'Jul': 7, 'Aug': 8, 'Sep': 9, 'Oct': 10, 'Nov': 11, 'Dec': 12
}

lock = threading.Lock()

def parse_datetime_to_unix(dt_str):
    """
    将日志中的时间字符串转为 UNIX 时间戳
    比如:01/May/2025:02:00:05 +0000
    """
    try:
        date_part, tz = dt_str.split(' ')
        day, month_str, year_time = date_part.split('/')
        year, time_part = year_time.split(':', 1)
        month = MONTH_MAP[month_str]
        dt = datetime.strptime(f"{year}-{month:02d}-{day} {time_part}", "%Y-%m-%d %H:%M:%S")
        # 忽略时区差异,假设为UTC时间
        return int(time.mktime(dt.timetuple()))
    except Exception:
        return None

def extract_data_from_log(file_path):
    """从单个日志文件提取数据"""
    results = []
    with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:
        for line in f:
            match = LOG_PATTERN.search(line)
            if match:
                g = match.groupdict()
                ts = parse_datetime_to_unix(g["datetime"])
                if ts:
                    results.append({
                        "ip": g["ip"],
                        "datetime": ts,
                        "method": g["method"],
                        "path": g["path"],
                        "status": int(g["status"]),
                        "size": int(g["size"]),
                        "user_agent": g["user_agent"]
                    })
    return results

def sanitize_table_name(filename):
    """生成安全的表名"""
    name = os.path.splitext(filename)[0]
    # 去除不合法字符,只保留字母数字和下划线
    name = re.sub(r'[^0-9a-zA-Z_]', '_', name)
    return name

def create_table_if_not_exists(conn, table_name):
    """创建表"""
    with lock:
        c = conn.cursor()
        c.execute(f"""
            CREATE TABLE IF NOT EXISTS "{table_name}" (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                ip TEXT,
                datetime INTEGER,
                method TEXT,
                path TEXT,
                status INTEGER,
                size INTEGER,
                user_agent TEXT
            )
        """)
        conn.commit()

def table_exists(conn, table_name):
    """检查表是否存在"""
    c = conn.cursor()
    c.execute("SELECT name FROM sqlite_master WHERE type='table' AND name=?", (table_name,))
    return c.fetchone() is not None

def insert_data(conn, table_name, data):
    """插入数据"""
    with lock:
        c = conn.cursor()
        c.executemany(
            f'INSERT INTO "{table_name}" (ip, datetime, method, path, status, size, user_agent) '
            f'VALUES (:ip, :datetime, :method, :path, :status, :size, :user_agent)',
            data
        )
        conn.commit()

def process_single_file(db_path, log_file):
    """处理单个日志文件"""
    conn = sqlite3.connect(db_path, check_same_thread=False)
    table_name = sanitize_table_name(os.path.basename(log_file))

    # 断点续传:若表已存在则跳过
    if table_exists(conn, table_name):
        print(f"已存在表 {table_name},跳过 {os.path.basename(log_file)}")
        conn.close()
        return 0

    data = extract_data_from_log(log_file)
    if not data:
        print(f"无有效数据:{log_file}")
        conn.close()
        return 0

    create_table_if_not_exists(conn, table_name)
    insert_data(conn, table_name, data)
    conn.close()
    print(f"{table_name}: 导入 {len(data)} 条")
    return len(data)

def process_logs_multithread(input_dir, db_path, max_workers=4):
    """多线程处理目录下所有日志"""
    log_files = [
        os.path.join(input_dir, f)
        for f in os.listdir(input_dir)
        if f.endswith(".log")
    ]

    total = 0
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        futures = {executor.submit(process_single_file, db_path, lf): lf for lf in log_files}
        for future in as_completed(futures):
            try:
                total += future.result()
            except Exception as e:
                print(f"处理 {futures[future]} 出错: {e}")

    print(f"\n处理完毕,共导入 {total} 条数据到 {db_path}")

if __name__ == "__main__":
    input_dir = input("请输入日志文件夹路径:").strip()
    db_path = input("请输入SQLite数据库路径(例如 D:\\logs.db):").strip()
    threads = input("请输入并行线程数(默认 4):").strip()
    threads = int(threads) if threads.isdigit() else 4

    process_logs_multithread(input_dir, db_path, max_workers=threads)

实现效果

  1. 并行扫描日志文件
  2. 每个文件创建一个独立表(如 openbmclapi_2025_03_13_020002_log
  3. 若表已存在则自动跳过(断点续传)
  4. 日志时间自动转为 Unix 时间戳(整数)

第四次尝试

每张数据表都是 “openbmclapi_2025_03_13_020002_log” 这种格式,有点小长。移除 “openbmclapi_” 和 “020002_log”,并添加"log"前缀。就会得到 “log_2025_03_13”,“log” 表示日志标识,“2025_03_13” 表示日期。这样既能避免 “前缀不能是数字”,符合命名规则;

改进目标

  1. 每张表改成这种格式:“log_xxxx_xx_xx”,比如:“log_2025_03_13
  2. 移除 “openbmclapi_” 和 “020002_log” 前后缀,只保留日期,并添加"log"前缀,符合命名规则
  3. 添加导入进度条(tqdm),方便查看进度
  4. 导入后建立索引,以加速查询
单 SQLite 数据库所有 .log 数据集中存储
每个文件独立表表名格式为 log_2025_03_13
注:从文件名中提取日期
自动建表 + 索引每个表自动建立索引 idx_datetime
多线程并行导入默认 4 线程,可自定义 8 线程
断点续传已导入表自动跳过
时间戳存储datetime 字段为 UNIX 时间戳(整数)
实时进度条使用 tqdm 显示导入进度

Python 代码

Python
import os
import re
import sqlite3
import threading
import time
from datetime import datetime
from concurrent.futures import ThreadPoolExecutor, as_completed
from tqdm import tqdm

# -------------------------------
# 日志匹配规则
# -------------------------------
LOG_PATTERN = re.compile(
    r'::ffff:(?P<ip>[\d\.]+) - - \[(?P<datetime>[^\]]+)\] '
    r'"(?P<method>\S+) (?P<path>\S+)(?:\?\S*)? HTTP/[\d\.]+" '
    r'(?P<status>\d+) (?P<size>\d+) "-" "(?P<user_agent>[^"]+)"'
)

MONTH_MAP = {
    'Jan': 1, 'Feb': 2, 'Mar': 3, 'Apr': 4, 'May': 5, 'Jun': 6,
    'Jul': 7, 'Aug': 8, 'Sep': 9, 'Oct': 10, 'Nov': 11, 'Dec': 12
}

lock = threading.Lock()

# -------------------------------
# 工具函数
# -------------------------------
def parse_datetime_to_unix(dt_str):
    """将日志中的时间字符串转为 UNIX 时间戳"""
    try:
        date_part, tz = dt_str.split(' ')
        day, month_str, year_time = date_part.split('/')
        year, time_part = year_time.split(':', 1)
        month = MONTH_MAP[month_str]
        dt = datetime.strptime(f"{year}-{month:02d}-{day} {time_part}", "%Y-%m-%d %H:%M:%S")
        return int(time.mktime(dt.timetuple()))
    except Exception:
        return None

def extract_data_from_log(file_path):
    """解析单个日志文件"""
    results = []
    with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:
        for line in f:
            match = LOG_PATTERN.search(line)
            if match:
                g = match.groupdict()
                ts = parse_datetime_to_unix(g["datetime"])
                if ts:
                    results.append({
                        "ip": g["ip"],
                        "datetime": ts,
                        "method": g["method"],
                        "path": g["path"],
                        "status": int(g["status"]),
                        "size": int(g["size"]),
                        "user_agent": g["user_agent"]
                    })
    return results

def extract_table_name_from_filename(filename):
    """
    从文件名中提取日期部分,例如:
    openbmclapi_2025-03-13_020002_log.log → log_2025_03_13
    """
    name = os.path.splitext(filename)[0]
    m = re.search(r'(\d{4})[-_](\d{2})[-_](\d{2})', name)
    if m:
        year, month, day = m.groups()
        return f"log_{year}_{month}_{day}"
    else:
        # fallback 兜底
        name = re.sub(r'[^0-9a-zA-Z_]', '_', name)
        return f"log_{name}"

# -------------------------------
# SQLite 操作函数
# -------------------------------
def table_exists(conn, table_name):
    """检查表是否存在"""
    c = conn.cursor()
    c.execute("SELECT name FROM sqlite_master WHERE type='table' AND name=?", (table_name,))
    return c.fetchone() is not None

def create_table_and_index(conn, table_name):
    """创建表和索引"""
    with lock:
        c = conn.cursor()
        c.execute(f"""
            CREATE TABLE IF NOT EXISTS "{table_name}" (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                ip TEXT,
                datetime INTEGER,
                method TEXT,
                path TEXT,
                status INTEGER,
                size INTEGER,
                user_agent TEXT
            )
        """)
        # 创建索引加速查询
        c.execute(f"CREATE INDEX IF NOT EXISTS idx_datetime_{table_name} ON '{table_name}' (datetime);")
        conn.commit()

def insert_data(conn, table_name, data):
    """插入多条记录"""
    with lock:
        c = conn.cursor()
        c.executemany(
            f'INSERT INTO "{table_name}" (ip, datetime, method, path, status, size, user_agent) '
            f'VALUES (:ip, :datetime, :method, :path, :status, :size, :user_agent)',
            data
        )
        conn.commit()

# -------------------------------
# 主任务函数
# -------------------------------
def process_single_file(db_path, log_file):
    """处理单个日志文件"""
    conn = sqlite3.connect(db_path, check_same_thread=False)
    filename = os.path.basename(log_file)
    table_name = extract_table_name_from_filename(filename)

    # 断点续传
    if table_exists(conn, table_name):
        conn.close()
        return 0, table_name, True

    data = extract_data_from_log(log_file)
    if not data:
        conn.close()
        return 0, table_name, False

    create_table_and_index(conn, table_name)
    insert_data(conn, table_name, data)
    conn.close()
    return len(data), table_name, False

def process_logs_multithread(input_dir, db_path, max_workers=4):
    """多线程处理所有日志文件"""
    log_files = [
        os.path.join(root, f)
        for root, _, files in os.walk(input_dir)
        for f in files if f.endswith(".log")
    ]

    total = 0
    skipped = 0

    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        futures = {executor.submit(process_single_file, db_path, lf): lf for lf in log_files}
        for future in tqdm(as_completed(futures), total=len(futures), desc="导入进度", ncols=100):
            try:
                count, table_name, was_skipped = future.result()
                if was_skipped:
                    skipped += 1
                else:
                    total += count
            except Exception as e:
                print(f"处理 {futures[future]} 出错: {e}")

    print(f"\n导入完成,共导入 {total} 条记录,跳过 {skipped} 个已存在表。")
    print(f"数据库文件: {db_path}")

# -------------------------------
# 程序入口
# -------------------------------
if __name__ == "__main__":
    input_dir = input("请输入日志文件夹路径:").strip()
    db_path = input("请输入SQLite数据库路径(例如 D:\\logs.db):").strip()
    threads = input("请输入并行线程数(默认 4):").strip()
    threads = int(threads) if threads.isdigit() else 4

    process_logs_multithread(input_dir, db_path, max_workers=threads)

实现效果

假设日志文件名

Python
openbmclapi_2025-03-13_020002_log.log
openbmclapi_2025-03-14_031245_log.log
openbmclapi_2025-03-15_041556_log.log

生成的 SQLite 表为

Python
log_2025_03_13
log_2025_03_14
log_2025_03_15

且每个表自动带有索引:

Python
idx_datetime_log_2025_03_13
idx_datetime_log_2025_03_14
idx_datetime_log_2025_03_15

第四次尝试

偷懒一下,SQLite文件输出路径要手动填写文件名(比如:D:\logs.db),改成文件名固定(不用手动填文件名),但输出地址依然要手动填写~
想了想,文件不可能只存不读吧,哪就添加一个“合并查询视图” 功能,创建一个 all_logs_view,可一次性跨所有表查询,不用一个个适配

改进目标

  1. 多线程并行处理 .log 文件
  2. 断点续传:自动跳过已导入的文件
  3. 自动生成表名:openbmclapi_2025-03-13_020002_log.log → 表名 log_2025_03_13
  4. SQLite 数据库存储固定为:openbmclapi.db
  5. 用户只需输入“日志文件目录”
  6. 数据库存放路径自动拼接输出路径
  7. 日期自动转为 Unix 时间戳(方便按日期筛选)
  8. 自动生成合并视图 all_logs_view,支持跨表查询
  9. 自动创建索引,提升查询性能
  10. 导入进度条(tqdm

Python 代码

Python
import os
import re
import sqlite3
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from tqdm import tqdm
from datetime import datetime

# 固定数据库文件名
DB_NAME = "openbmclapi.db"

# 日志匹配正则
LOG_PATTERN = re.compile(
    r'::ffff:(?P<ip>[\d\.]+) - - \[(?P<datetime>[^\]]+)\] '
    r'"(?P<method>\S+) (?P<path>\S+)(?:\?\S*)? HTTP/[\d\.]+" '
    r'(?P<status>\d+) (?P<size>\d+) "-" "(?P<user_agent>[^"]+)"'
)

# 日期解析函数
def parse_datetime_to_unix(dt_str: str) -> int:
    try:
        # 格式如 "01/May/2025:02:00:05 +0000"
        return int(datetime.strptime(dt_str.split(" ")[0], "%d/%b/%Y:%H:%M:%S").timestamp())
    except Exception:
        return 0

# 提取表名:log_YYYY_MM_DD
def extract_table_name(filename: str) -> str:
    match = re.search(r"_(\d{4})-(\d{2})-(\d{2})_", filename)
    if match:
        return f"log_{match.group(1)}_{match.group(2)}_{match.group(3)}"
    return "log_unknown"

# 提取日志数据
def extract_log_data(file_path):
    rows = []
    with open(file_path, "r", encoding="utf-8", errors="ignore") as f:
        for line in f:
            m = LOG_PATTERN.search(line)
            if m:
                gd = m.groupdict()
                rows.append((
                    gd["ip"],
                    parse_datetime_to_unix(gd["datetime"]),
                    gd["method"],
                    gd["path"],
                    int(gd["status"]),
                    int(gd["size"]),
                    gd["user_agent"],
                    os.path.basename(file_path)
                ))
    return rows

# 建表(如果不存在)
def create_table_if_not_exists(conn, table_name):
    conn.execute(f"""
        CREATE TABLE IF NOT EXISTS {table_name} (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            ip TEXT,
            datetime INTEGER,
            method TEXT,
            path TEXT,
            status INTEGER,
            size INTEGER,
            user_agent TEXT,
            filename TEXT
        )
    """)
    # 创建索引(提升查询速度)
    conn.execute(f"CREATE INDEX IF NOT EXISTS idx_{table_name}_datetime ON {table_name}(datetime)")
    conn.execute(f"CREATE INDEX IF NOT EXISTS idx_{table_name}_ip ON {table_name}(ip)")
    conn.commit()

# 写入数据库
def insert_logs(conn, table_name, rows):
    if not rows:
        return
    conn.executemany(f"""
        INSERT INTO {table_name} (ip, datetime, method, path, status, size, user_agent, filename)
        VALUES (?, ?, ?, ?, ?, ?, ?, ?)
    """, rows)
    conn.commit()

# 检查是否已导入该文件
def is_file_imported(conn, filename):
    cursor = conn.execute(
        "SELECT name FROM sqlite_master WHERE type='table' AND name LIKE 'log_%'"
    )
    for (table_name,) in cursor.fetchall():
        res = conn.execute(f"SELECT 1 FROM {table_name} WHERE filename=? LIMIT 1", (filename,)).fetchone()
        if res:
            return True
    return False

# 处理单个日志文件
def process_single_log(conn_path, file_path):
    conn = sqlite3.connect(conn_path)
    filename = os.path.basename(file_path)
    if is_file_imported(conn, filename):
        conn.close()
        return f"跳过(已导入): {filename}"

    table_name = extract_table_name(filename)
    create_table_if_not_exists(conn, table_name)
    rows = extract_log_data(file_path)
    insert_logs(conn, table_name, rows)
    conn.close()
    return f"已导入: {filename}{len(rows)} 条)"

# 创建合并视图
def create_combined_view(conn):
    cursor = conn.execute("SELECT name FROM sqlite_master WHERE type='table' AND name LIKE 'log_%'")
    tables = [row[0] for row in cursor.fetchall()]
    if not tables:
        return

    union_sql = " UNION ALL ".join([f"SELECT * FROM {t}" for t in tables])
    conn.execute("DROP VIEW IF EXISTS all_logs_view")
    conn.execute(f"CREATE VIEW all_logs_view AS {union_sql}")
    conn.commit()

# 主函数
def main():
    input_dir = input("请输入日志文件夹路径:").strip()
    output_dir = input("请输入输出文件夹路径(用于存放 openbmclapi.db):").strip()

    db_path = os.path.join(output_dir, DB_NAME)
    os.makedirs(output_dir, exist_ok=True)

    conn = sqlite3.connect(db_path)
    conn.close()

    log_files = [os.path.join(input_dir, f) for f in os.listdir(input_dir) if f.endswith(".log")]
    print(f"共检测到 {len(log_files)} 个日志文件\n")

    with ThreadPoolExecutor(max_workers=8) as executor:
        futures = {executor.submit(process_single_log, db_path, f): f for f in log_files}
        for _ in tqdm(as_completed(futures), total=len(futures), desc="导入进度", unit="文件"):
            pass

    # 创建合并视图
    conn = sqlite3.connect(db_path)
    create_combined_view(conn)
    conn.close()

    print(f"\n全部处理完成!数据库文件已生成:{db_path}")
    print("统一查询视图:all_logs_view 已创建,可跨表查询。")

if __name__ == "__main__":
    main()

第五次尝试

不行,大问题!
导出的数据库查看后,日期对不上,日志数据是从2025-03-13 - 2025-11-02,导出只有16张表,且发现大部分数据表是空表,只有几张是有数据,但好像也不完整,即 “部分空表、表数量不对”.
思考片刻,好像是:多线程并发写入同一个 SQLite 文件,会触发互锁机制

SQLite 的锁机制是整个数据库级别的(非表级别):

  • 多个连接 同时写入一个 .db 文件,哪怕写不同表,SQLite 仍会使用 RESERVEDEXCLUSIVE 锁;
  • 如果线程 A 正在 COMMIT,线程 B 会阻塞甚至报错;
  • 多线程时,Python 的 sqlite3 默认不会等待太久,可能出现:
    • 写入中断
    • 空表
    • 数据丢失
    • 表未 commit 就关闭连接

最后还是换成单线程,避免多线程导致互锁(有时候 聪明 也会出错)

Python 代码

Python
import os
import re
import sqlite3
from datetime import datetime
from tqdm import tqdm

# 固定数据库文件名
DB_NAME = "openbmclapi.db"

# 日志匹配正则
LOG_PATTERN = re.compile(
    r'::ffff:(?P<ip>[\d.]+) - - \[(?P<datetime>[^]]+)] '
    r'"(?P<method>\S+) (?P<path>\S+)(?:\?\S*)? HTTP/[\d.]+" '
    r'(?P<status>\d+) (?P<size>\d+) "-" "(?P<user_agent>[^"]+)"'
)

# 将 "01/May/2025:02:00:05 +0000" 转为 UNIX 时间戳
def parse_datetime_to_unix(dt_str: str) -> int:
    try:
        dt_clean = dt_str.split(" ")[0]
        return int(datetime.strptime(dt_clean, "%d/%b/%Y:%H:%M:%S").timestamp())
    except Exception:
        return 0

# 从文件名提取表名:log_YYYY_MM_DD
def extract_table_name(filename: str) -> str:
    match = re.search(r"_(\d{4})-(\d{2})-(\d{2})_", filename)
    if match:
        return f"log_{match.group(1)}_{match.group(2)}_{match.group(3)}"
    return "log_unknown"

# 提取日志数据
def extract_log_data(file_path):
    rows = []
    with open(file_path, "r", encoding="utf-8", errors="ignore") as f:
        for line in f:
            m = LOG_PATTERN.search(line)
            if m:
                gd = m.groupdict()
                rows.append((
                    gd["ip"],
                    parse_datetime_to_unix(gd["datetime"]),
                    gd["method"],
                    gd["path"],
                    int(gd["status"]),
                    int(gd["size"]),
                    gd["user_agent"],
                    os.path.basename(file_path)
                ))
    return rows

# 创建表(如果不存在)
def create_table_if_not_exists(conn, table_name):
    conn.execute(f"""
        CREATE TABLE IF NOT EXISTS {table_name} (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            ip TEXT,
            datetime INTEGER,
            method TEXT,
            path TEXT,
            status INTEGER,
            size INTEGER,
            user_agent TEXT,
            filename TEXT
        )
    """)
    conn.execute(f"CREATE INDEX IF NOT EXISTS idx_{table_name}_datetime ON {table_name}(datetime)")
    conn.execute(f"CREATE INDEX IF NOT EXISTS idx_{table_name}_ip ON {table_name}(ip)")
    conn.commit()

# 检查是否已导入
def is_file_imported(conn, filename):
    cursor = conn.execute("SELECT name FROM sqlite_master WHERE type='table' AND name LIKE 'log_%'")
    for (table_name,) in cursor.fetchall():
        res = conn.execute(f"SELECT 1 FROM {table_name} WHERE filename=? LIMIT 1", (filename,)).fetchone()
        if res:
            return True
    return False

# 单线程处理:一个接一个导入
def process_logs_serial(input_dir, db_path):
    conn = sqlite3.connect(db_path)
    log_files = [f for f in os.listdir(input_dir) if f.endswith(".log")]

    for filename in tqdm(log_files, desc="导入进度", unit="文件"):
        file_path = os.path.join(input_dir, filename)

        if is_file_imported(conn, filename):
            tqdm.write(f"跳过(已导入): {filename}")
            continue

        table_name = extract_table_name(filename)
        rows = extract_log_data(file_path)
        if not rows:
            tqdm.write(f"无有效数据: {filename}")
            continue

        create_table_if_not_exists(conn, table_name)
        conn.executemany(f"""
            INSERT INTO {table_name}
            (ip, datetime, method, path, status, size, user_agent, filename)
            VALUES (?, ?, ?, ?, ?, ?, ?, ?)
        """, rows)
        conn.commit()
        tqdm.write(f"已导入: {filename}{len(rows)} 条)")

    conn.close()

# 创建跨表查询视图
def create_combined_view(db_path):
    conn = sqlite3.connect(db_path)
    cursor = conn.execute("SELECT name FROM sqlite_master WHERE type='table' AND name LIKE 'log_%'")
    tables = [row[0] for row in cursor.fetchall()]
    if not tables:
        print("没有可用的日志表,未创建视图。")
        conn.close()
        return

    union_sql = " UNION ALL ".join([f"SELECT * FROM {t}" for t in tables])
    conn.execute("DROP VIEW IF EXISTS all_logs_view")
    conn.execute(f"CREATE VIEW all_logs_view AS {union_sql}")
    conn.commit()
    conn.close()
    print("统一查询视图 all_logs_view 已创建。")

def main():
    input_dir = input("请输入日志文件夹路径:").strip()
    output_dir = input("请输入输出文件夹路径:").strip()

    os.makedirs(output_dir, exist_ok=True)
    db_path = os.path.join(output_dir, DB_NAME)

    print(f"输出数据库:{db_path}")
    process_logs_serial(input_dir, db_path)
    create_combined_view(db_path)
    print(f"\n全部导入完成!数据库文件位于:{db_path}")

if __name__ == "__main__":
    main()

改动说明

多线程移除,改为单线程顺序执行
锁表问题消除(所有写入同一连接)
表结构保留 log_YYYY_MM_DD 命名
时间戳转为 UNIX 时间戳
断点续传检查已导入文件跳过
跨表视图自动重建 all_logs_view

验证查询

SQL
-- 查询日志表总数
SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name LIKE 'log_%';

-- 查询 2025-03-13 的数据
SELECT * FROM log_2025_03_13 LIMIT 5;

-- 跨表查询近7天访问记录
SELECT * FROM all_logs_view
WHERE datetime > strftime('%s', 'now', '-7 day');

待完成

  1. 转换为PostgreSQL(上面测试使用的是SQLite,线上需要换成PostgreSQL)
  2. API编写
  3. API部署
  4. API测试
  5. API上线
  6. Web界面设计(Next)
  7. Web设计
  8. Web编写
  9. Web测试
  10. Web上线

结束

有点弃坑QAQ

茶馆

网站攻击分析报告 - 20251120&21

2025-11-21 22:04:47

茶馆

2025年度总结:这一年,博客没火,但也没似

2025-12-31 11:59:00

0 条回复 A文章作者 M管理员
    暂无讨论,说说你的看法吧
个人中心
购物车
优惠劵
今日签到
搜索