RDS AI助手深度测评-场景4:CPU压测指引

简介: 阿里云数据库RDS「RDS AI助手」正式上线啦!用聊天的方式,帮你搞定 信息查询、问题诊断、慢SQL优化、实例巡检 等一系列数据库运维任务,更能随你调遣,定制最贴近你业务的个性化Agent!现在免费公测中,欢迎留下您的诉求:https://survey.aliyun.com/apps/zhiliao/m3RVhe0m4

环境准备

  • RDS MySQL:开通rds.mysql.s2.large(2核4GB)的实例,存储空间100GB
  • ECS:推荐4核8GB以上规格

压测脚本

修改DB_CONFIG为RDS MySQL的链接地址和账密。压测约5分钟后,在RDS MySQL实例详情页面,能够观察到有CPU事件推送,点击事件进行一键诊断。

import math
import pymysql
import random
import time
import threading
from datetime import datetime, timedelta
from concurrent.futures import ThreadPoolExecutor
import signal
import logging
import os
from logging.handlers import RotatingFileHandler
import multiprocessing
# ==================== 日志配置 ====================
def setup_logger():
    """配置日志系统"""
    # 创建日志目录
    log_dir = "logs"
    if not os.path.exists(log_dir):
        os.makedirs(log_dir)
    # 创建日志文件名(基于当前时间)
    log_filename = f"{log_dir}/cpu_high_qps_cause_and_slow_log.log"
    # 配置根日志记录器
    logger = logging.getLogger()
    logger.setLevel(logging.DEBUG)  # 设置为DEBUG级别,确保能看到所有日志
    # 创建格式化器
    formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
    # 文件处理器 - 滚动日志,每个文件最大10MB,保留5个备份
    file_handler = RotatingFileHandler(
        log_filename, maxBytes=10 * 1024 * 1024, backupCount=5, encoding='utf-8'
    )
    file_handler.setFormatter(formatter)
    logger.addHandler(file_handler)
    # 控制台处理器
    console_handler = logging.StreamHandler()
    console_handler.setFormatter(formatter)
    logger.addHandler(console_handler)
    return log_filename
# ==================== 数据库配置 ====================
DB_CONFIG = {
    'host': 'rm-xxx.mysql.rds.aliyuncs.com',
    'port': 3306,
    'user': 'you_username',
    'password': 'you_password',
    'charset': 'utf8mb4',
    'autocommit': True,
}
DATABASE_NAME = 'test_db'
def get_connection(use_database=True):
    config = DB_CONFIG.copy()
    if use_database:
        config['database'] = DATABASE_NAME
    return pymysql.connect(**config)
def create_database_if_not_exists():
    conn = get_connection(use_database=False)
    try:
        with conn.cursor() as cursor:
            cursor.execute(f"SHOW DATABASES LIKE '{DATABASE_NAME}'")
            result = cursor.fetchone()
            if result:
                # 数据库已存在,先删除
                logging.info(f"数据库 {DATABASE_NAME} 已存在,正在删除...")
                cursor.execute(f"DROP DATABASE {DATABASE_NAME}")
                logging.info(f"数据库 {DATABASE_NAME} 已删除")
            # 创建新数据库
            cursor.execute(f"CREATE DATABASE {DATABASE_NAME} CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci")
            logging.info(f"数据库 {DATABASE_NAME} 创建成功")
        conn.commit()
    except Exception as e:
        logging.error(f"检查/创建数据库失败: {e}")
        conn.rollback()
        # 可以选择在这里重新尝试或抛出异常
        raise e
    finally:
        conn.close()
def drop_database_if_exists():
    """
    删除数据库(如果存在),用于还原数据库状态
    注意:此操作会永久删除数据库中的所有数据,请谨慎使用
    """
    conn = get_connection(use_database=False)
    try:
        with conn.cursor() as cursor:
            cursor.execute(f"SHOW DATABASES LIKE '{DATABASE_NAME}'")
            result = cursor.fetchone()
            if result:
                cursor.execute(f"DROP DATABASE {DATABASE_NAME}")
                logging.info(f"数据库 {DATABASE_NAME} 已删除")
            else:
                logging.info(f"数据库 {DATABASE_NAME} 不存在,无需删除")
        conn.commit()
    except Exception as e:
        logging.error(f"删除数据库失败: {e}")
        conn.rollback()
    finally:
        conn.close()
def create_tables():
    conn = get_connection()
    try:
        with conn.cursor() as cursor:
            cursor.execute("""
                CREATE TABLE IF NOT EXISTS users (
                    id INT AUTO_INCREMENT PRIMARY KEY,
                    username VARCHAR(100) NOT NULL UNIQUE,
                    email VARCHAR(255) NOT NULL,
                    password_hash VARCHAR(255) NOT NULL,
                    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
                    last_login TIMESTAMP NULL,
                    status ENUM('active', 'inactive', 'suspended') DEFAULT 'active',
                    INDEX idx_username (username),
                    INDEX idx_email (email),
                    INDEX idx_status (status),
                    INDEX idx_created_at (created_at)
                ) ENGINE=InnoDB
            """)
            cursor.execute("""
                CREATE TABLE IF NOT EXISTS products (
                    id INT AUTO_INCREMENT PRIMARY KEY,
                    name VARCHAR(255) NOT NULL,
                    description TEXT,
                    price DECIMAL(10, 2) NOT NULL,
                    category_id INT NOT NULL,
                    stock_quantity INT DEFAULT 0,
                    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
                    is_active BOOLEAN DEFAULT TRUE,
                    FULLTEXT INDEX idx_product_name (name),
                    INDEX idx_category (category_id),
                    INDEX idx_price (price),
                    INDEX idx_created_at (created_at)
                ) ENGINE=InnoDB
            """)
            cursor.execute("""
                CREATE TABLE IF NOT EXISTS orders (
                    id INT AUTO_INCREMENT PRIMARY KEY,
                    user_id INT NOT NULL,
                    total_amount DECIMAL(10, 2) NOT NULL,
                    status ENUM('pending', 'paid', 'shipped', 'delivered', 'cancelled') DEFAULT 'pending',
                    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
                    shipping_address TEXT,
                    payment_method ENUM('credit_card', 'paypal', 'bank_transfer') DEFAULT 'credit_card',
                    INDEX idx_user_id (user_id)
                    -- 移除 created_at 和 total_amount 索引,让范围查询在高QPS时变慢
                ) ENGINE=InnoDB
            """)
            cursor.execute("""
                CREATE TABLE IF NOT EXISTS order_items (
                    id INT AUTO_INCREMENT PRIMARY KEY,
                    order_id INT NOT NULL,
                    product_id INT NOT NULL,
                    quantity INT NOT NULL,
                    price DECIMAL(10, 2) NOT NULL,
                    subtotal DECIMAL(10, 2) NOT NULL,
                    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                    INDEX idx_order_id (order_id),
                    INDEX idx_product_id (product_id),
                    INDEX idx_created_at (created_at),
                    FOREIGN KEY (order_id) REFERENCES orders(id) ON DELETE CASCADE,
                    FOREIGN KEY (product_id) REFERENCES products(id) ON DELETE CASCADE
                ) ENGINE=InnoDB
            """)
            cursor.execute("""
                CREATE TABLE IF NOT EXISTS user_activity_logs (
                    id INT AUTO_INCREMENT PRIMARY KEY,
                    user_id INT NOT NULL,
                    activity_type VARCHAR(100) NOT NULL,
                    description TEXT,
                    ip_address VARCHAR(45),
                    user_agent TEXT,
                    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                    INDEX idx_user_id (user_id),
                    INDEX idx_activity_type (activity_type)
                    -- 移除 created_at 索引,让时间范围查询在高QPS时变慢
                ) ENGINE=InnoDB
            """)
            cursor.execute("""
                CREATE TABLE IF NOT EXISTS product_reviews (
                    id INT AUTO_INCREMENT PRIMARY KEY,
                    product_id INT NOT NULL,
                    user_id INT NOT NULL,
                    rating TINYINT NOT NULL CHECK (rating >= 1 AND rating <= 5),
                    title VARCHAR(255),
                    comment TEXT,
                    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
                    is_approved BOOLEAN DEFAULT FALSE,
                    INDEX idx_product_id (product_id),
                    INDEX idx_user_id (user_id),
                    INDEX idx_rating (rating),
                    INDEX idx_created_at (created_at),
                    FULLTEXT INDEX idx_comment (comment),
                    FOREIGN KEY (product_id) REFERENCES products(id) ON DELETE CASCADE,
                    FOREIGN KEY (user_id) REFERENCES users(id) ON DELETE CASCADE
                ) ENGINE=InnoDB
            """)
            cursor.execute("""
                CREATE TABLE IF NOT EXISTS categories (
                    id INT AUTO_INCREMENT PRIMARY KEY,
                    name VARCHAR(100) NOT NULL,
                    parent_id INT DEFAULT NULL,
                    description TEXT,
                    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
                    INDEX idx_parent_id (parent_id),
                    FOREIGN KEY (parent_id) REFERENCES categories(id) ON DELETE SET NULL
                ) ENGINE=InnoDB
            """)
            logging.info("所有表创建成功")
        conn.commit()
    except Exception as e:
        logging.error(f"创建表失败: {e}")
        conn.rollback()
    finally:
        conn.close()
# ==================== 数据填充函数(大幅增加数据量)====================
def generate_users(num_users=50000):
    """生成更多用户数据"""
    conn = get_connection()
    try:
        with conn.cursor() as cursor:
            batch_size = 5000
            for i in range(0, num_users, batch_size):
                values = [
                    (f"user{i + j}", f"user{i + j}@example.com", f"hash{random.randint(100000, 999999)}")
                    for j in range(batch_size)
                ]
                cursor.executemany(
                    "INSERT IGNORE INTO users (username, email, password_hash) VALUES (%s, %s, %s)",
                    values
                )
                conn.commit()
                logging.info(f"已插入 {min(i + batch_size, num_users)}/{num_users} 用户")
        logging.info("用户数据插入完成")
    except Exception as e:
        logging.error(f"插入用户数据失败: {e}")
        conn.rollback()
    finally:
        conn.close()
def generate_categories():
    conn = get_connection()
    try:
        with conn.cursor() as cursor:
            categories = [
                ('Electronics', None, 'Electronic devices and accessories'),
                ('Computers', None, 'Computers and laptops'),
                ('Smartphones', 1, 'Smartphones and mobile devices'),
                ('Laptops', 2, 'Laptops and notebooks'),
                ('Tablets', 1, 'Tablets and iPads'),
                ('Cameras', 1, 'Digital cameras and accessories'),
                ('TVs', 1, 'Televisions and home theater'),
                ('Books', None, 'Books and magazines'),
                ('Fiction', 8, 'Fiction books'),
                ('Non-Fiction', 8, 'Non-fiction books'),
                ('Clothing', None, 'Clothing and apparel'),
                ('Men', 11, "Men's clothing"),
                ('Women', 11, "Women's clothing"),
                ('Shoes', 11, 'Shoes and footwear'),
                ('Home', None, 'Home and kitchen'),
                ('Furniture', 15, 'Furniture'),
                ('Kitchen', 15, 'Kitchen appliances'),
                ('Garden', 15, 'Garden and outdoor'),
            ]
            cursor.executemany(
                "INSERT IGNORE INTO categories (name, parent_id, description) VALUES (%s, %s, %s)",
                categories
            )
            conn.commit()
            logging.info("分类数据插入完成")
    except Exception as e:
        logging.error(f"插入分类数据失败: {e}")
        conn.rollback()
    finally:
        conn.close()
def generate_products(num_products=20000):
    """生成更多产品数据"""
    conn = get_connection()
    try:
        with conn.cursor() as cursor:
            cursor.execute("SELECT id FROM categories")
            category_ids = [row[0] for row in cursor.fetchall()]
            batch_size = 2000
            product_names = ["Pro", "Elite", "Premium", "Advanced", "Standard", "Basic", "Deluxe", "Ultimate"]
            product_types = ["Laptop", "Smartphone", "Tablet", "Camera", "TV", "Monitor", "Headphones", "Speaker"]
            for i in range(0, num_products, batch_size):
                values = []
                for _ in range(batch_size):
                    name = f"{random.choice(product_names)} {random.choice(product_types)} {random.randint(1, 10000)}"
                    price = round(random.uniform(10, 2000), 2)
                    values.append(
                        (name, f"Description of {name}", price, random.choice(category_ids), random.randint(0, 1000)))
                cursor.executemany(
                    "INSERT IGNORE INTO products (name, description, price, category_id, stock_quantity) VALUES (%s, %s, %s, %s, %s)",
                    values
                )
                conn.commit()
                logging.info(f"已插入 {min(i + batch_size, num_products)}/{num_products} 产品")
        logging.info("产品数据插入完成")
    except Exception as e:
        logging.error(f"插入产品数据失败: {e}")
        conn.rollback()
    finally:
        conn.close()
def generate_orders(num_orders=100_000):
    """生成大量订单"""
    conn = get_connection()
    try:
        with conn.cursor() as cursor:
            cursor.execute("SELECT id FROM users")
            user_ids = [row[0] for row in cursor.fetchall()]
            cursor.execute("SELECT id, price FROM products")
            products = {row[0]: row[1] for row in cursor.fetchall()}
            product_ids = list(products.keys())
            batch_size = 10000
            start_date = datetime.now() - timedelta(days=365)
            for i in range(0, num_orders, batch_size):
                values = []
                for _ in range(batch_size):
                    user_id = random.choice(user_ids)
                    total = round(sum(products[random.choice(product_ids)] * random.randint(1, 3) for _ in range(3)), 2)
                    created_at = start_date + timedelta(
                        days=random.randint(0, 365),
                        hours=random.randint(0, 23),
                        minutes=random.randint(0, 59)
                    )
                    values.append((
                        user_id, total, random.choice(['pending', 'paid', 'shipped', 'delivered', 'cancelled']),
                        created_at, created_at, 'Address',
                        'credit_card'))
                cursor.executemany(
                    "INSERT INTO orders (user_id, total_amount, status, created_at, updated_at, shipping_address, payment_method) VALUES (%s, %s, %s, %s, %s, %s, %s)",
                    values
                )
                conn.commit()
                logging.info(f"已插入 {min(i + batch_size, num_orders)}/{num_orders} 订单")
        logging.info("订单数据插入完成")
    except Exception as e:
        logging.error(f"插入订单数据失败: {e}")
        conn.rollback()
    finally:
        conn.close()
def generate_order_items():
    """生成订单项"""
    conn = get_connection()
    try:
        with conn.cursor() as cursor:
            cursor.execute("SELECT id FROM orders")
            order_ids = [row[0] for row in cursor.fetchall()]
            cursor.execute("SELECT id, price FROM products")
            products = {row[0]: row[1] for row in cursor.fetchall()}
            batch_size = 50000
            values = []
            for order_id in order_ids:
                for _ in range(random.randint(1, 5)):
                    product_id = random.choice(list(products.keys()))
                    qty = random.randint(1, 3)
                    price = products[product_id]
                    values.append((order_id, product_id, qty, price, qty * price, datetime.now()))
                    if len(values) >= batch_size:
                        cursor.executemany(
                            "INSERT INTO order_items (order_id, product_id, quantity, price, subtotal, created_at) VALUES (%s, %s, %s, %s, %s, %s)",
                            values
                        )
                        conn.commit()
                        logging.info(f"已插入 {len(values)} 订单项")
                        values = []
            if values:
                cursor.executemany(
                    "INSERT INTO order_items (order_id, product_id, quantity, price, subtotal, created_at) VALUES (%s, %s, %s, %s, %s, %s)",
                    values
                )
                conn.commit()
                logging.info(f"已插入 {len(values)} 订单项")
        logging.info("订单项数据插入完成")
    except Exception as e:
        logging.error(f"插入订单项数据失败: {e}")
        conn.rollback()
    finally:
        conn.close()
def generate_product_reviews(num_reviews=200000):
    """生成大量评论"""
    conn = get_connection()
    try:
        with conn.cursor() as cursor:
            cursor.execute("SELECT id FROM users")
            user_ids = [row[0] for row in cursor.fetchall()]
            cursor.execute("SELECT id FROM products")
            product_ids = [row[0] for row in cursor.fetchall()]
            batch_size = 10000
            titles = ["Great!", "Good", "Ok", "Bad", "Excellent"]
            comments = ["Good product.", "Nice.", "Could be better.", "Excellent quality."]
            for i in range(0, num_reviews, batch_size):
                values = [
                    (
                        random.choice(product_ids),
                        random.choice(user_ids),
                        random.randint(1, 5),
                        random.choice(titles),
                        random.choice(comments),
                        datetime.now() - timedelta(days=random.randint(0, 180)),
                        datetime.now() - timedelta(days=random.randint(0, 180)),
                        random.random() > 0.2
                    )
                    for _ in range(batch_size)
                ]
                cursor.executemany(
                    "INSERT INTO product_reviews (product_id, user_id, rating, title, comment, created_at, updated_at, is_approved) VALUES (%s, %s, %s, %s, %s, %s, %s, %s)",
                    values
                )
                conn.commit()
                logging.info(f"已插入 {min(i + batch_size, num_reviews)}/{num_reviews} 评论")
        logging.info("产品评论数据插入完成")
    except Exception as e:
        logging.error(f"插入评论失败: {e}")
        conn.rollback()
    finally:
        conn.close()
def generate_user_activity_logs(num_logs=500000):
    """生成大量日志"""
    conn = get_connection()
    try:
        with conn.cursor() as cursor:
            cursor.execute("SELECT id FROM users")
            user_ids = [row[0] for row in cursor.fetchall()]
            batch_size = 50000
            types = ['login', 'view_product', 'add_to_cart', 'purchase']
            ips = ['192.168.1.1', '10.0.0.1', '172.16.0.1']
            agents = ['Mozilla/5.0', 'iPhone', 'Android']
            for i in range(0, num_logs, batch_size):
                values = [
                    (
                        random.choice(user_ids),
                        random.choice(types),
                        f"Action: {random.choice(types)}",
                        random.choice(ips),
                        random.choice(agents),
                        datetime.now() - timedelta(days=random.randint(0, 90))
                    )
                    for _ in range(batch_size)
                ]
                cursor.executemany(
                    "INSERT INTO user_activity_logs (user_id, activity_type, description, ip_address, user_agent, created_at) VALUES (%s, %s, %s, %s, %s, %s)",
                    values
                )
                conn.commit()
                logging.info(f"已插入 {min(i + batch_size, num_logs)}/{num_logs} 日志")
        logging.info("日志数据插入完成")
    except Exception as e:
        logging.exception(f"插入日志失败: {e}")
        conn.rollback()
    finally:
        conn.close()
def populate_database():
    # 否则执行数据插入逻辑
    logging.info("开始填充数据...")
    generate_users(50_000)
    generate_categories()
    generate_products(20_000)
    generate_orders(100_000)
    generate_order_items()
    generate_product_reviews(200_000)
    generate_user_activity_logs(500_000)
    logging.info("数据填充完成")
# ==================== 查询函数(快速查询,产生高QPS)====================
def normal_queries(query_type):
    """快速查询,产生高QPS,增加CPU压力"""
    # 简化:每个线程一个长连接
    conn = get_connection()
    cursor = conn.cursor(pymysql.cursors.DictCursor)
    try:
        query_type = query_type if query_type <= 10 else query_type % 10
        while True:
            start_time = time.time()
            try:
                if query_type == 1:
                    user_id = random.randint(1, 50000)
                    cursor.execute("SELECT id, username, status FROM users WHERE id = %s", (user_id,))
                    cursor.fetchone()
                elif query_type == 2:
                    cursor.execute("SELECT id, username FROM users WHERE status = %s LIMIT 10",
                                   (random.choice(['active', 'inactive']),))
                    cursor.fetchall()
                elif query_type == 3:
                    order_id = random.randint(1, 100000)
                    cursor.execute(
                        "SELECT o.id, o.total_amount, o.status, u.username FROM orders o JOIN users u ON o.user_id = u.id WHERE o.id = %s",
                        (order_id,))
                    cursor.fetchone()
                elif query_type == 4:
                    keyword = random.choice(['Pro', 'Smart', 'Laptop'])
                    cursor.execute(
                        "SELECT id, name, price FROM products WHERE MATCH(name) AGAINST (%s IN NATURAL LANGUAGE MODE) LIMIT 5",
                        (keyword,))
                    cursor.fetchall()
                elif query_type == 5:
                    product_id = random.randint(1, 20000)
                    cursor.execute("SELECT id, name, price, category_id FROM products WHERE id = %s", (product_id,))
                    cursor.fetchone()
                elif query_type == 6:
                    username = f"user{random.randint(0, 49999)}"
                    cursor.execute("SELECT id, username, email FROM users WHERE username = %s", (username,))
                    cursor.fetchone()
                elif query_type == 7:
                    order_item_id = random.randint(1, 300000)
                    cursor.execute("SELECT id, order_id, product_id, quantity FROM order_items WHERE id = %s",
                                   (order_item_id,))
                    cursor.fetchone()
                elif query_type == 8:
                    # 简单的JOIN查询,增加CPU压力
                    user_id = random.randint(1, 50000)
                    cursor.execute(
                        "SELECT u.id, u.username, COUNT(o.id) as order_count FROM users u LEFT JOIN orders o ON u.id = o.user_id WHERE u.id = %s GROUP BY u.id, u.username",
                        (user_id,))
                    cursor.fetchone()
                elif query_type == 9:
                    # 简单的聚合查询,增加CPU压力
                    cursor.execute(
                        "SELECT status, COUNT(*) as cnt FROM orders WHERE status IN ('cancelled') GROUP BY status LIMIT 5")
                    cursor.fetchall()
                elif query_type == 10:
                    # 简单的范围查询,增加CPU压力
                    product_id = random.randint(1, 20000)
                    cursor.execute("SELECT id, name, price FROM products WHERE id BETWEEN %s AND %s LIMIT 10",
                                   (product_id, product_id + 100))
                    cursor.fetchall()
            except Exception as e:
                try:
                    conn.rollback()
                except:
                    pass
                continue
            finally:
                end_time = time.time()
                if end_time - start_time > 2:
                    logging.info(f"查询类型: {query_type}, 耗时: {end_time - start_time:.2f}s")
    finally:
        cursor.close()
        conn.close()
def start_worker_process(concurrent_tasks):
    """工作进程函数,创建指定数量的线程执行任务"""
    logging.info(f"工作进程启动,创建 {concurrent_tasks} 个线程")
    with ThreadPoolExecutor(max_workers=concurrent_tasks) as executor:
        # 提交normal_queries任务
        futures = [executor.submit(normal_queries, i) for i in range(concurrent_tasks)]
        # 等待所有任务完成
        for future in futures:
            try:
                future.result()
            except Exception as e:
                logging.error(f"任务执行出错: {e}")
# ==================== 主程序 ====================
if __name__ == "__main__":
    # 设置日志
    log_file = setup_logger()
    logging.info(f"日志文件已创建: {log_file}")
    logging.info("可以使用 'tail -f {log_file}' 命令实时查看日志")
    exit_signal = False
    def signal_handler(signum, frame):
        global exit_signal
        logging.info("检测到退出信号,程序将终止...")
        exit()
    signal.signal(signal.SIGINT, signal_handler)
    is_create = 'y'
    if is_create == 'y':
        logging.info("########## 开始创建数据库数据... ##########")
        create_database_if_not_exists()
        # 总是创建表,确保表存在
        logging.info("########## 开始创建表... ##########")
        create_tables()
        logging.info("########## 开始填充数据... ##########")
        populate_database()  # 填充大数据量
    logging.info("########## 开始准备进行场景构造... ##########")
    # 计算需要的进程数, mysql rds.mysql.s2.large规格,400并发 = 61%
    cpu_usage_ratio = 0.9
    total_concurrent = math.ceil(cpu_usage_ratio / (0.61/400))
    concurrency_per_process = 200
    num_processes = (total_concurrent + concurrency_per_process - 1) // concurrency_per_process  # 向上取整
    logging.info(f"将启动 {num_processes} 个进程,每个进程处理 {concurrency_per_process} 个并发任务,目标CPU使用率 {cpu_usage_ratio*100}%")
    # 创建并启动多个进程
    processes = []
    for i in range(num_processes):
        # 计算当前进程的并发数
        current_concurrency = min(concurrency_per_process, total_concurrent - i * concurrency_per_process)
        # 创建进程
        p = multiprocessing.Process(target=start_worker_process, args=(current_concurrency,))
        p.start()
        processes.append(p)
        logging.info(f"已启动进程 {p.pid},处理 {current_concurrency} 个并发任务")
    try:
        # 监控进程运行
        while not exit_signal:
            logging.info(f"加压进行中,当前时间是:{time.strftime('%Y-%m-%d %H:%M:%S')}")
            time.sleep(5)
        # 终止所有进程
        for p in processes:
            p.terminate()
            p.join()
        logging.info("所有进程已终止,程序结束")
    except KeyboardInterrupt:
        logging.info("收到键盘中断信号,正在终止所有进程...")
        for p in processes:
            p.terminate()
            p.join()
        logging.info("所有进程已终止")


👉欢迎 钉钉搜索群号:106730017609 入群交流

👉立即上手:登录云数据库RDS控制台

👉查看更多文档:RDS AI助手

👉「RDS AI助手」现在免费公测中,欢迎留下您的诉求:https://survey.aliyun.com/apps/zhiliao/m3RVhe0m4

相关文章
|
22天前
|
关系型数据库 MySQL Java
开源PolarDB-X备份恢复操作实操
作者介绍: 付文革,航天壹进制(江苏)信息科技有限公司产品研发,专注于数据库备份,主攻MySQL相关数据库以及各种国产分布式数据库的备份恢复,主要使用Java 、Python、Shell等编程语言 航天壹进制(江苏)信息科技有限公司(简称航天壹进制)作为中国航天科工集团有限公司旗下上市公司航天工业发展股份有限公司的全资下属企业,专注于数据安全领域,自主研发并提供数据保护与业务连续性管理产品、解决方案及服务。
|
23天前
|
机器人 数据挖掘 API
一个销售数据分析机器人的诞生:看 Dify 如何在 DMS 助力下实现自动化闭环
Dify 作为一款低代码 AI 应用开发平台,凭借其直观的可视化工作流编排能力,极大降低了大模型应用的开发门槛。
364 22
一个销售数据分析机器人的诞生:看 Dify 如何在 DMS 助力下实现自动化闭环
|
23天前
|
人工智能 自然语言处理 搜索推荐
文章“找茬”神器——媒体行业AI智能校对方案
年初DeepSeek大模型火爆以后,各行各业都在加速建设AI相关的场景,媒体行业无疑是大模型场景适配较好的一个行业。大模型凭借强大的内容生成能力,可以深度渗透内容生产的全链路环节,从热点事件的智能抓取、新闻稿件的快速生成,文章智能校对、个性化润色,大模型几乎可以重构传统内容生产流程。
234 15
|
1月前
|
存储 人工智能 关系型数据库
钉钉ONE选用阿里云PolarDB数据库,实现百亿级数据的高效向量检索
阿里云瑶池PolarDB PostgreSQL版作为钉钉ONE的底层数据库,凭借分布式架构与向量检索能力,支撑百亿级数据、高并发与AI智能推荐,助力钉钉实现“事找人”的办公新范式。
|
19天前
|
数据采集 人工智能 自然语言处理
Meta SAM3开源:让图像分割,听懂你的话
Meta发布并开源SAM 3,首个支持文本或视觉提示的统一图像视频分割模型,可精准分割“红色条纹伞”等开放词汇概念,覆盖400万独特概念,性能达人类水平75%–80%,推动视觉分割新突破。
1009 59
Meta SAM3开源:让图像分割,听懂你的话
|
2月前
|
SQL 关系型数据库 MySQL
阿里云RDS云数据库全解析:产品功能、收费标准与活动参考
与云服务器ECS一样,关系型数据库RDS也是很多用户上云必买的热门云产品之一,阿里云的云数据库RDS主要包含RDS MySQL、RDS SQL Server、RDS PostgreSQL、RDS MariaDB等几个关系型数据库,并且提供了容灾、备份、恢复、监控、迁移等方面的全套解决方案,帮助您解决数据库运维的烦恼。本文为大家介绍阿里云的云数据库 RDS主要产品及计费方式、收费标准以及活动等相关情况,以供参考。
|
4月前
|
存储 SQL 关系型数据库
RDS DuckDB技术解析一:当 MySQL遇见列式存储引擎
RDS MySQL DuckDB分析实例以​列式存储与向量化计算​为核心,实现​复杂分析查询性能百倍跃升​,为企业在海量数据规模场景下提供​实时分析能力​,加速企业数据驱动型决策效能。​​
|
12天前
|
人工智能 自然语言处理 数据可视化
让企业决策像开挂!瑶池 Data Agent 全生命周期数据智能体使用全攻略
Data Agent 是阿里云瑶池数据库推出的数据智能体,融合 Data+AI 与 Agentic AI 技术,支持自然语言交互,自动化完成数据理解、分析洞察与报告生成,助力企业实现数据驱动决策。