当你已经掌握了C语言的基础语法和进阶知识,能够编写复杂的数据结构和系统程序时,真正的挑战才刚刚开始。C语言高级开发要求你深入理解操作系统内核、编译器原理、硬件架构、并发模型、网络协议栈、性能优化、安全编程、跨平台开发等前沿领域。本文将系统梳理C语言高级开发的核心知识体系,涵盖内核编程、并发与同步、网络编程、编译器与链接、性能极致优化、安全编程、跨平台开发、调试与逆向等深度领域,助你完成从优秀开发者到系统级专家的跨越。
第一篇:操作系统内核编程
1.1 Linux内核模块开发基础
Linux内核模块是C语言高级开发的重要领域,它允许开发者动态扩展内核功能,无需重新编译整个内核。
/**
* 完整的Linux内核模块示例
* 文件:my_kernel_module.c
*
* 编译命令:
* make -C /lib/modules/$(uname -r)/build M=$(pwd) modules
*
* 安装:sudo insmod my_kernel_module.ko
* 查看:lsmod | grep my_kernel_module
* 卸载:sudo rmmod my_kernel_module
* 查看日志:dmesg | tail
*/
#include <linux/init.h>
#include <linux/module.h>
#include <linux/kernel.h>
#include <linux/fs.h>
#include <linux/uaccess.h>
#include <linux/slab.h>
#include <linux/device.h>
#include <linux/cdev.h>
#include <linux/kthread.h>
#include <linux/delay.h>
MODULE_LICENSE("GPL");
MODULE_AUTHOR("Advanced C Developer");
MODULE_DESCRIPTION("Advanced Kernel Module Demo");
MODULE_VERSION("1.0");
/* ========== 模块参数 ========== */
static int debug_level = 0;
module_param(debug_level, int, 0644);
MODULE_PARM_DESC(debug_level, "Debug level (0-3)");
static char *device_name = "my_device";
module_param(device_name, charp, 0644);
MODULE_PARM_DESC(device_name, "Device name");
/* ========== 设备结构体 ========== */
struct my_device_data {
struct cdev cdev;
struct class *class;
struct device *device;
dev_t dev_num;
char *buffer;
size_t buffer_size;
int open_count;
struct mutex lock;
wait_queue_head_t read_queue;
int data_ready;
};
static struct my_device_data *g_dev;
/* ========== 调试宏 ========== */
#define DEBUG_ERR 1
#define DEBUG_WARN 2
#define DEBUG_INFO 3
#define dprintk(level, fmt, ...) \
do { \
if (debug_level >= level) \
printk(KERN_DEBUG "my_module: " fmt, ##__VA_ARGS__); \
} while (0)
/* ========== 文件操作函数 ========== */
static int my_open(struct inode *inode, struct file *filp) {
struct my_device_data *dev = container_of(inode->i_cdev, struct my_device_data, cdev);
filp->private_data = dev;
mutex_lock(&dev->lock);
dev->open_count++;
mutex_unlock(&dev->lock);
dprintk(DEBUG_INFO, "Device opened, count=%d\n", dev->open_count);
return 0;
}
static int my_release(struct inode *inode, struct file *filp) {
struct my_device_data *dev = filp->private_data;
mutex_lock(&dev->lock);
dev->open_count--;
mutex_unlock(&dev->lock);
dprintk(DEBUG_INFO, "Device closed, count=%d\n", dev->open_count);
return 0;
}
static ssize_t my_read(struct file *filp, char __user *buf, size_t count, loff_t *fpos) {
struct my_device_data *dev = filp->private_data;
size_t to_read;
int ret;
// 等待数据可用
ret = wait_event_interruptible(dev->read_queue, dev->data_ready);
if (ret) {
return -ERESTARTSYS;
}
mutex_lock(&dev->lock);
if (*fpos >= dev->buffer_size) {
mutex_unlock(&dev->lock);
return 0;
}
to_read = min(count, dev->buffer_size - *fpos);
if (copy_to_user(buf, dev->buffer + *fpos, to_read)) {
mutex_unlock(&dev->lock);
return -EFAULT;
}
*fpos += to_read;
if (*fpos >= dev->buffer_size) {
dev->data_ready = 0;
}
mutex_unlock(&dev->lock);
dprintk(DEBUG_INFO, "Read %zu bytes at offset %lld\n", to_read, *fpos - to_read);
return to_read;
}
static ssize_t my_write(struct file *filp, const char __user *buf, size_t count, loff_t *fpos) {
struct my_device_data *dev = filp->private_data;
size_t to_write;
int ret;
mutex_lock(&dev->lock);
// 动态扩展缓冲区
if (*fpos + count > dev->buffer_size) {
size_t new_size = *fpos + count + 4096;
char *new_buffer = kmalloc(new_size, GFP_KERNEL);
if (!new_buffer) {
mutex_unlock(&dev->lock);
return -ENOMEM;
}
if (dev->buffer) {
memcpy(new_buffer, dev->buffer, dev->buffer_size);
kfree(dev->buffer);
}
dev->buffer = new_buffer;
dev->buffer_size = new_size;
dprintk(DEBUG_INFO, "Buffer expanded to %zu bytes\n", dev->buffer_size);
}
to_write = min(count, dev->buffer_size - *fpos);
if (copy_from_user(dev->buffer + *fpos, buf, to_write)) {
mutex_unlock(&dev->lock);
return -EFAULT;
}
*fpos += to_write;
dev->data_ready = 1;
wake_up_interruptible(&dev->read_queue);
mutex_unlock(&dev->lock);
dprintk(DEBUG_INFO, "Wrote %zu bytes at offset %lld\n", to_write, *fpos - to_write);
return to_write;
}
static loff_t my_llseek(struct file *filp, loff_t offset, int whence) {
struct my_device_data *dev = filp->private_data;
loff_t new_pos;
mutex_lock(&dev->lock);
switch (whence) {
case SEEK_SET:
new_pos = offset;
break;
case SEEK_CUR:
new_pos = filp->f_pos + offset;
break;
case SEEK_END:
new_pos = dev->buffer_size + offset;
break;
default:
mutex_unlock(&dev->lock);
return -EINVAL;
}
if (new_pos < 0 || new_pos > dev->buffer_size) {
mutex_unlock(&dev->lock);
return -EINVAL;
}
filp->f_pos = new_pos;
mutex_unlock(&dev->lock);
return new_pos;
}
static struct file_operations my_fops = {
.owner = THIS_MODULE,
.open = my_open,
.release = my_release,
.read = my_read,
.write = my_write,
.llseek = my_llseek,
};
/* ========== 内核线程示例 ========== */
static struct task_struct *worker_thread;
static int worker_function(void *data) {
struct my_device_data *dev = (struct my_device_data *)data;
while (!kthread_should_stop()) {
// 每10秒更新一次数据
ssleep(10);
mutex_lock(&dev->lock);
// 生成随机数据
sprintf(dev->buffer, "Kernel thread generated data at %llu\n",
(unsigned long long)ktime_get_real_ns());
dev->buffer_size = strlen(dev->buffer);
dev->data_ready = 1;
wake_up_interruptible(&dev->read_queue);
mutex_unlock(&dev->lock);
dprintk(DEBUG_INFO, "Kernel thread generated data\n");
}
return 0;
}
static int start_kernel_thread(struct my_device_data *dev) {
worker_thread = kthread_create(worker_function, dev, "my_worker");
if (IS_ERR(worker_thread)) {
return PTR_ERR(worker_thread);
}
wake_up_process(worker_thread);
return 0;
}
static void stop_kernel_thread(void) {
if (worker_thread) {
kthread_stop(worker_thread);
worker_thread = NULL;
}
}
/* ========== proc文件系统 ========== */
#include <linux/proc_fs.h>
#include <linux/seq_file.h>
static struct proc_dir_entry *proc_entry;
static int my_proc_show(struct seq_file *m, void *v) {
struct my_device_data *dev = m->private;
seq_printf(m, "Device: %s\n", device_name);
seq_printf(m, "Buffer size: %zu bytes\n", dev->buffer_size);
seq_printf(m, "Open count: %d\n", dev->open_count);
seq_printf(m, "Data ready: %d\n", dev->data_ready);
seq_printf(m, "Debug level: %d\n", debug_level);
return 0;
}
static int my_proc_open(struct inode *inode, struct file *file) {
return single_open(file, my_proc_show, PDE_DATA(inode));
}
static const struct proc_ops my_proc_ops = {
.proc_open = my_proc_open,
.proc_read = seq_read,
.proc_lseek = seq_lseek,
.proc_release = single_release,
};
/* ========== 模块初始化与退出 ========== */
static int __init my_module_init(void) {
int ret;
dprintk(DEBUG_INFO, "Initializing kernel module\n");
// 分配设备数据结构
g_dev = kmalloc(sizeof(struct my_device_data), GFP_KERNEL);
if (!g_dev) {
printk(KERN_ERR "Failed to allocate device data\n");
return -ENOMEM;
}
memset(g_dev, 0, sizeof(struct my_device_data));
mutex_init(&g_dev->lock);
init_waitqueue_head(&g_dev->read_queue);
// 分配设备号
ret = alloc_chrdev_region(&g_dev->dev_num, 0, 1, device_name);
if (ret) {
printk(KERN_ERR "Failed to allocate device number\n");
goto err_free_dev;
}
// 初始化字符设备
cdev_init(&g_dev->cdev, &my_fops);
g_dev->cdev.owner = THIS_MODULE;
ret = cdev_add(&g_dev->cdev, g_dev->dev_num, 1);
if (ret) {
printk(KERN_ERR "Failed to add cdev\n");
goto err_unregister_dev;
}
// 创建设备类
g_dev->class = class_create(THIS_MODULE, device_name);
if (IS_ERR(g_dev->class)) {
ret = PTR_ERR(g_dev->class);
printk(KERN_ERR "Failed to create class\n");
goto err_del_cdev;
}
// 创建设备节点
g_dev->device = device_create(g_dev->class, NULL, g_dev->dev_num,
NULL, device_name);
if (IS_ERR(g_dev->device)) {
ret = PTR_ERR(g_dev->device);
printk(KERN_ERR "Failed to create device\n");
goto err_destroy_class;
}
// 初始化缓冲区
g_dev->buffer = kmalloc(4096, GFP_KERNEL);
if (!g_dev->buffer) {
ret = -ENOMEM;
goto err_destroy_device;
}
g_dev->buffer_size = 4096;
sprintf(g_dev->buffer, "Hello from kernel module!\n");
g_dev->data_ready = 1;
// 创建proc文件
proc_entry = proc_create_data(device_name, 0444, NULL, &my_proc_ops, g_dev);
if (!proc_entry) {
printk(KERN_WARNING "Failed to create proc entry\n");
}
// 启动内核线程
ret = start_kernel_thread(g_dev);
if (ret) {
printk(KERN_WARNING "Failed to start kernel thread\n");
}
printk(KERN_INFO "Module loaded successfully. Device major:minor = %d:%d\n",
MAJOR(g_dev->dev_num), MINOR(g_dev->dev_num));
return 0;
err_destroy_device:
device_destroy(g_dev->class, g_dev->dev_num);
err_destroy_class:
class_destroy(g_dev->class);
err_del_cdev:
cdev_del(&g_dev->cdev);
err_unregister_dev:
unregister_chrdev_region(g_dev->dev_num, 1);
err_free_dev:
kfree(g_dev);
return ret;
}
static void __exit my_module_exit(void) {
dprintk(DEBUG_INFO, "Exiting kernel module\n");
stop_kernel_thread();
if (proc_entry) {
remove_proc_entry(device_name, NULL);
}
if (g_dev->buffer) {
kfree(g_dev->buffer);
}
device_destroy(g_dev->class, g_dev->dev_num);
class_destroy(g_dev->class);
cdev_del(&g_dev->cdev);
unregister_chrdev_region(g_dev->dev_num, 1);
mutex_destroy(&g_dev->lock);
kfree(g_dev);
printk(KERN_INFO "Module unloaded\n");
}
module_init(my_module_init);
module_exit(my_module_exit);
1.2 系统调用拦截与Hook技术
/**
* 系统调用拦截示例
* 注意:此代码仅用于学习和安全研究,实际生产环境需要谨慎使用
*
* 原理:通过修改sys_call_table中的函数指针来实现系统调用劫持
*/
#include <linux/kernel.h>
#include <linux/module.h>
#include <linux/kallsyms.h>
#include <linux/syscalls.h>
#include <linux/unistd.h>
#include <asm/unistd.h>
MODULE_LICENSE("GPL");
MODULE_AUTHOR("Advanced C Developer");
// 原始系统调用指针
static unsigned long *sys_call_table;
static asmlinkage long (*original_open)(const char __user *filename, int flags, umode_t mode);
static asmlinkage long (*original_mkdir)(const char __user *pathname, umode_t mode);
// 获取系统调用表地址(多种方法)
static unsigned long *get_syscall_table(void) {
unsigned long *table = NULL;
#ifdef CONFIG_KALLSYMS
// 方法1:通过kallsyms_lookup_name
table = (unsigned long *)kallsyms_lookup_name("sys_call_table");
#endif
// 方法2:通过/proc/kallsyms
if (!table) {
// 在运行时查找
struct file *file;
char *line;
loff_t pos = 0;
file = filp_open("/proc/kallsyms", O_RDONLY, 0);
if (!IS_ERR(file)) {
char buf[256];
while (vfs_read(file, buf, sizeof(buf)-1, &pos) > 0) {
// 解析并查找sys_call_table
}
filp_close(file, NULL);
}
}
return table;
}
// 自定义open系统调用
asmlinkage long hooked_open(const char __user *filename, int flags, umode_t mode) {
char buffer[256];
int ret;
// 复制文件名到内核空间
if (strncpy_from_user(buffer, filename, sizeof(buffer) - 1) >= 0) {
buffer[sizeof(buffer) - 1] = '\0';
// 日志记录
printk(KERN_INFO "HOOKED open: %s (flags=%x)\n", buffer, flags);
// 访问控制:禁止访问/etc/shadow
if (strstr(buffer, "/etc/shadow") != NULL) {
printk(KERN_WARNING "Access denied to %s\n", buffer);
return -EACCES;
}
// 文件路径重定向示例
if (strstr(buffer, "/etc/passwd") != NULL) {
// 重定向到安全文件
const char *safe_path = "/tmp/safe_passwd";
ret = original_open(safe_path, flags, mode);
printk(KERN_INFO "Redirected to %s\n", safe_path);
return ret;
}
}
// 调用原始系统调用
return original_open(filename, flags, mode);
}
// 自定义mkdir系统调用
asmlinkage long hooked_mkdir(const char __user *pathname, umode_t mode) {
char buffer[256];
if (strncpy_from_user(buffer, pathname, sizeof(buffer) - 1) >= 0) {
printk(KERN_INFO "HOOKED mkdir: %s\n", buffer);
// 禁止在/tmp下创建目录
if (strncmp(buffer, "/tmp/", 5) == 0) {
printk(KERN_WARNING "mkdir denied in /tmp\n");
return -EACCES;
}
}
return original_mkdir(pathname, mode);
}
// 内存保护操作
static void disable_write_protection(void) {
unsigned long cr0;
cr0 = read_cr0();
clear_bit(16, &cr0); // 清除写保护位
write_cr0(cr0);
}
static void enable_write_protection(void) {
unsigned long cr0;
cr0 = read_cr0();
set_bit(16, &cr0);
write_cr0(cr0);
}
static int __init hook_init(void) {
sys_call_table = get_syscall_table();
if (!sys_call_table) {
printk(KERN_ERR "Cannot find sys_call_table\n");
return -ENOSYS;
}
printk(KERN_INFO "sys_call_table at %p\n", sys_call_table);
printk(KERN_INFO "__NR_open = %d\n", __NR_open);
// 保存原始系统调用
original_open = (void *)sys_call_table[__NR_open];
original_mkdir = (void *)sys_call_table[__NR_mkdir];
// 修改系统调用表
disable_write_protection();
sys_call_table[__NR_open] = (unsigned long)hooked_open;
sys_call_table[__NR_mkdir] = (unsigned long)hooked_mkdir;
enable_write_protection();
printk(KERN_INFO "System call hooks installed\n");
return 0;
}
static void __exit hook_exit(void) {
// 恢复原始系统调用
disable_write_protection();
sys_call_table[__NR_open] = (unsigned long)original_open;
sys_call_table[__NR_mkdir] = (unsigned long)original_mkdir;
enable_write_protection();
printk(KERN_INFO "System call hooks removed\n");
}
module_init(hook_init);
module_exit(hook_exit);
1.3 内存管理与slab分配器
/**
* 内核内存管理高级特性
* 包括slab缓存、内存池、大页内存等
*/
#include <linux/slab.h>
#include <linux/mm.h>
#include <linux/hugetlb.h>
#include <linux/gfp.h>
// ========== 自定义slab缓存 ==========
struct my_custom_object {
int id;
char name[64];
struct list_head list;
atomic_t refcount;
void *data;
};
static struct kmem_cache *my_object_cache;
int init_object_cache(void) {
// 创建slab缓存
// 参数:名称、对象大小、对齐、标志、构造函数
my_object_cache = kmem_cache_create("my_object_cache",
sizeof(struct my_custom_object),
0, // 默认对齐
SLAB_HWCACHE_ALIGN | SLAB_PANIC,
NULL); // 构造函数
if (!my_object_cache) {
printk(KERN_ERR "Failed to create slab cache\n");
return -ENOMEM;
}
printk(KERN_INFO "Created slab cache with object size %zu\n",
sizeof(struct my_custom_object));
return 0;
}
struct my_custom_object *alloc_my_object(void) {
struct my_custom_object *obj;
obj = kmem_cache_alloc(my_object_cache, GFP_KERNEL);
if (obj) {
memset(obj, 0, sizeof(*obj));
atomic_set(&obj->refcount, 1);
INIT_LIST_HEAD(&obj->list);
}
return obj;
}
void free_my_object(struct my_custom_object *obj) {
if (obj && atomic_dec_and_test(&obj->refcount)) {
if (obj->data) {
kfree(obj->data);
}
kmem_cache_free(my_object_cache, obj);
}
}
void destroy_object_cache(void) {
if (my_object_cache) {
kmem_cache_destroy(my_object_cache);
my_object_cache = NULL;
}
}
// ========== 内存池(Memory Pool) ==========
#include <linux/mempool.h>
struct my_pool_object {
int data;
// ... 其他字段
};
static mempool_t *my_mempool;
int init_mempool(void) {
// 创建内存池:预分配一定数量的对象
my_mempool = mempool_create(10, // 最少保留对象数
mempool_alloc_slab, // 分配函数
mempool_free_slab, // 释放函数
my_object_cache); // 数据源
if (!my_mempool) {
printk(KERN_ERR "Failed to create mempool\n");
return -ENOMEM;
}
return 0;
}
struct my_pool_object *alloc_from_pool(void) {
return mempool_alloc(my_mempool, GFP_KERNEL);
}
void free_to_pool(struct my_pool_object *obj) {
mempool_free(obj, my_mempool);
}
void destroy_mempool(void) {
if (my_mempool) {
mempool_destroy(my_mempool);
}
}
// ========== 大页内存(Huge Pages) ==========
#include <linux/hugetlb.h>
void *alloc_huge_page_memory(void) {
struct page *page;
void *addr;
// 分配大页内存
page = alloc_pages(GFP_KERNEL | __GFP_COMP, HUGETLB_PAGE_ORDER);
if (!page) {
printk(KERN_ERR "Failed to allocate huge page\n");
return NULL;
}
addr = page_address(page);
printk(KERN_INFO "Allocated huge page at %p\n", addr);
return addr;
}
void free_huge_page_memory(void *addr) {
struct page *page = virt_to_page(addr);
__free_pages(page, HUGETLB_PAGE_ORDER);
}
// ========== 内存分配器性能对比 ==========
void memory_allocation_benchmark(void) {
const int iterations = 10000;
int i;
void *ptrs[10000];
unsigned long start, end;
// kmalloc测试
start = jiffies;
for (i = 0; i < iterations; i++) {
ptrs[i] = kmalloc(256, GFP_KERNEL);
}
for (i = 0; i < iterations; i++) {
kfree(ptrs[i]);
}
end = jiffies;
printk(KERN_INFO "kmalloc: %lu jiffies\n", end - start);
// 自定义slab缓存测试
start = jiffies;
for (i = 0; i < iterations; i++) {
ptrs[i] = kmem_cache_alloc(my_object_cache, GFP_KERNEL);
}
for (i = 0; i < iterations; i++) {
kmem_cache_free(my_object_cache, ptrs[i]);
}
end = jiffies;
printk(KERN_INFO "slab cache: %lu jiffies\n", end - start);
// 内存池测试
start = jiffies;
for (i = 0; i < iterations; i++) {
ptrs[i] = mempool_alloc(my_mempool, GFP_KERNEL);
}
for (i = 0; i < iterations; i++) {
mempool_free(ptrs[i], my_mempool);
}
end = jiffies;
printk(KERN_INFO "mempool: %lu jiffies\n", end - start);
}
第二篇:并发与同步深度
2.1 POSIX线程高级编程
/**
* POSIX线程高级特性
* 包括线程池、线程安全、优先级反转、线程局部存储等
*/
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <errno.h>
#include <semaphore.h>
#include <signal.h>
#include <sched.h>
// ========== 线程池实现 ==========
typedef struct thread_pool_task {
void (*function)(void *);
void *argument;
struct thread_pool_task *next;
} thread_pool_task_t;
typedef struct thread_pool {
pthread_mutex_t lock;
pthread_cond_t notify;
pthread_t *threads;
thread_pool_task_t *tasks;
int thread_count;
int queue_size;
int shutdown;
int started;
} thread_pool_t;
// 创建工作线程
static void *thread_pool_worker(void *arg) {
thread_pool_t *pool = (thread_pool_t *)arg;
thread_pool_task_t *task;
while (1) {
pthread_mutex_lock(&pool->lock);
// 等待任务或关闭信号
while (pool->tasks == NULL && !pool->shutdown) {
pthread_cond_wait(&pool->notify, &pool->lock);
}
if (pool->shutdown) {
pthread_mutex_unlock(&pool->lock);
pthread_exit(NULL);
}
// 获取任务
task = pool->tasks;
if (task) {
pool->tasks = task->next;
pool->queue_size--;
}
pthread_mutex_unlock(&pool->lock);
// 执行任务
if (task) {
task->function(task->argument);
free(task);
}
}
return NULL;
}
thread_pool_t *thread_pool_create(int thread_count) {
thread_pool_t *pool;
int i;
if (thread_count <= 0) thread_count = 1;
pool = malloc(sizeof(thread_pool_t));
if (!pool) return NULL;
pool->threads = malloc(thread_count * sizeof(pthread_t));
if (!pool->threads) {
free(pool);
return NULL;
}
pthread_mutex_init(&pool->lock, NULL);
pthread_cond_init(&pool->notify, NULL);
pool->tasks = NULL;
pool->thread_count = thread_count;
pool->queue_size = 0;
pool->shutdown = 0;
pool->started = 0;
// 创建工作线程
for (i = 0; i < thread_count; i++) {
if (pthread_create(&pool->threads[i], NULL,
thread_pool_worker, pool) != 0) {
thread_pool_destroy(pool, 1);
return NULL;
}
pool->started++;
}
return pool;
}
int thread_pool_add_task(thread_pool_t *pool, void (*function)(void *), void *arg) {
thread_pool_task_t *task;
if (pool == NULL || function == NULL) {
return -1;
}
task = malloc(sizeof(thread_pool_task_t));
if (!task) return -1;
task->function = function;
task->argument = arg;
task->next = NULL;
pthread_mutex_lock(&pool->lock);
// 添加到任务队列尾部
thread_pool_task_t *tail = pool->tasks;
if (tail) {
while (tail->next) tail = tail->next;
tail->next = task;
} else {
pool->tasks = task;
}
pool->queue_size++;
pthread_cond_signal(&pool->notify);
pthread_mutex_unlock(&pool->lock);
return 0;
}
int thread_pool_destroy(thread_pool_t *pool, int graceful) {
int i;
if (pool == NULL) return -1;
pthread_mutex_lock(&pool->lock);
// 设置关闭标志
pool->shutdown = 1;
// 唤醒所有等待线程
pthread_cond_broadcast(&pool->notify);
pthread_mutex_unlock(&pool->lock);
// 等待所有线程结束
for (i = 0; i < pool->thread_count; i++) {
pthread_join(pool->threads[i], NULL);
}
// 清理任务队列
thread_pool_task_t *task;
while (pool->tasks) {
task = pool->tasks;
pool->tasks = task->next;
free(task);
}
pthread_mutex_destroy(&pool->lock);
pthread_cond_destroy(&pool->notify);
free(pool->threads);
free(pool);
return 0;
}
// ========== 线程局部存储(TLS) ==========
__thread int tls_counter = 0; // GCC扩展:线程局部存储
void *tls_worker(void *arg) {
tls_counter++;
printf("Thread %ld: tls_counter = %d\n", (long)arg, tls_counter);
tls_counter++;
printf("Thread %ld: tls_counter = %d\n", (long)arg, tls_counter);
return NULL;
}
void tls_demo() {
pthread_t threads[5];
long i;
for (i = 0; i < 5; i++) {
pthread_create(&threads[i], NULL, tls_worker, (void *)i);
}
for (i = 0; i < 5; i++) {
pthread_join(threads[i], NULL);
}
}
// ========== 优先级反转与优先级继承 ==========
pthread_mutex_t mutex;
pthread_cond_t cond;
int shared_resource = 0;
void *high_priority_worker(void *arg) {
pthread_mutex_lock(&mutex);
// 等待低优先级线程释放锁
while (shared_resource == 0) {
pthread_cond_wait(&cond, &mutex);
}
shared_resource = 0;
pthread_mutex_unlock(&mutex);
return NULL;
}
void *medium_priority_worker(void *arg) {
// 中等优先级任务,抢占CPU
while (1) {
// CPU密集型工作
for (volatile int i = 0; i < 10000000; i++);
}
return NULL;
}
void *low_priority_worker(void *arg) {
pthread_mutex_lock(&mutex);
shared_resource = 1;
sleep(5); // 模拟长时间工作
pthread_cond_signal(&cond);
pthread_mutex_unlock(&mutex);
return NULL;
}
// 设置线程优先级
int set_thread_priority(pthread_t thread, int priority) {
struct sched_param param;
int policy;
pthread_getschedparam(thread, &policy, ¶m);
param.sched_priority = priority;
return pthread_setschedparam(thread, policy, ¶m);
}
// ========== 读写锁高级用法 ==========
#include <pthread.h>
typedef struct rw_lock {
pthread_rwlock_t rwlock;
int read_count;
int write_count;
pthread_mutex_t count_mutex;
} rw_lock_t;
void rw_lock_init(rw_lock_t *lock) {
pthread_rwlock_init(&lock->rwlock, NULL);
pthread_mutex_init(&lock->count_mutex, NULL);
lock->read_count = 0;
lock->write_count = 0;
}
void rw_lock_rdlock(rw_lock_t *lock) {
pthread_mutex_lock(&lock->count_mutex);
lock->read_count++;
pthread_mutex_unlock(&lock->count_mutex);
pthread_rwlock_rdlock(&lock->rwlock);
}
void rw_lock_wrlock(rw_lock_t *lock) {
pthread_mutex_lock(&lock->count_mutex);
lock->write_count++;
pthread_mutex_unlock(&lock->count_mutex);
pthread_rwlock_wrlock(&lock->rwlock);
}
void rw_lock_unlock(rw_lock_t *lock) {
pthread_mutex_lock(&lock->count_mutex);
if (lock->write_count > 0) {
lock->write_count--;
} else if (lock->read_count > 0) {
lock->read_count--;
}
pthread_mutex_unlock(&lock->count_mutex);
pthread_rwlock_unlock(&lock->rwlock);
}
// ========== 信号量与屏障 ==========
sem_t semaphore;
pthread_barrier_t barrier;
void *sem_worker(void *arg) {
int id = *(int *)arg;
sem_wait(&semaphore);
printf("Worker %d: acquired semaphore\n", id);
sleep(1);
sem_post(&semaphore);
return NULL;
}
void *barrier_worker(void *arg) {
int id = *(int *)arg;
printf("Worker %d: before barrier\n", id);
pthread_barrier_wait(&barrier);
printf("Worker %d: after barrier\n", id);
return NULL;
}
void sync_demo() {
int i;
pthread_t threads[5];
int ids[5];
// 信号量示例
sem_init(&semaphore, 0, 3); // 允许3个并发
for (i = 0; i < 10; i++) {
ids[i] = i;
pthread_create(&threads[i], NULL, sem_worker, &ids[i]);
}
for (i = 0; i < 10; i++) {
pthread_join(threads[i], NULL);
}
sem_destroy(&semaphore);
// 屏障示例
pthread_barrier_init(&barrier, NULL, 5);
for (i = 0; i < 5; i++) {
ids[i] = i;
pthread_create(&threads[i], NULL, barrier_worker, &ids[i]);
}
for (i = 0; i < 5; i++) {
pthread_join(threads[i], NULL);
}
pthread_barrier_destroy(&barrier);
}
2.2 无锁编程与原子操作
/**
* 无锁编程与原子操作
* 基于C11原子操作标准库
*/
#include <stdatomic.h>
#include <stdbool.h>
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <stdint.h>
// ========== 原子操作基础 ==========
atomic_int counter = ATOMIC_VAR_INIT(0);
void atomic_demo() {
// 原子递增
atomic_fetch_add(&counter, 1);
// 原子递减
atomic_fetch_sub(&counter, 1);
// 原子比较并交换
int expected = 0;
int desired = 1;
bool success = atomic_compare_exchange_strong(&counter, &expected, desired);
// 原子加载
int value = atomic_load(&counter);
// 原子存储
atomic_store(&counter, 100);
// 原子交换
int old = atomic_exchange(&counter, 200);
}
// ========== 无锁栈实现(Treiber栈) ==========
typedef struct lockfree_stack_node {
int data;
atomic_ptr(struct lockfree_stack_node) next;
} lockfree_stack_node_t;
typedef struct lockfree_stack {
atomic_ptr(lockfree_stack_node_t) top;
} lockfree_stack_t;
void lockfree_stack_init(lockfree_stack_t *stack) {
atomic_init(&stack->top, NULL);
}
void lockfree_stack_push(lockfree_stack_t *stack, int value) {
lockfree_stack_node_t *node = malloc(sizeof(lockfree_stack_node_t));
if (!node) return;
node->data = value;
atomic_init(&node->next, NULL);
lockfree_stack_node_t *old_top;
do {
old_top = atomic_load(&stack->top);
atomic_store(&node->next, old_top);
} while (!atomic_compare_exchange_weak(&stack->top, &old_top, node));
}
bool lockfree_stack_pop(lockfree_stack_t *stack, int *value) {
lockfree_stack_node_t *old_top;
lockfree_stack_node_t *new_top;
do {
old_top = atomic_load(&stack->top);
if (old_top == NULL) {
return false;
}
new_top = atomic_load(&old_top->next);
} while (!atomic_compare_exchange_weak(&stack->top, &old_top, new_top));
*value = old_top->data;
free(old_top);
return true;
}
// ========== 无锁队列(Michael-Scott算法) ==========
typedef struct lockfree_queue_node {
int data;
atomic_ptr(struct lockfree_queue_node) next;
} lockfree_queue_node_t;
typedef struct lockfree_queue {
atomic_ptr(lockfree_queue_node_t) head;
atomic_ptr(lockfree_queue_node_t) tail;
} lockfree_queue_t;
void lockfree_queue_init(lockfree_queue_t *queue) {
lockfree_queue_node_t *dummy = malloc(sizeof(lockfree_queue_node_t));
atomic_init(&dummy->next, NULL);
atomic_init(&queue->head, dummy);
atomic_init(&queue->tail, dummy);
}
void lockfree_queue_enqueue(lockfree_queue_t *queue, int value) {
lockfree_queue_node_t *node = malloc(sizeof(lockfree_queue_node_t));
node->data = value;
atomic_init(&node->next, NULL);
lockfree_queue_node_t *tail;
lockfree_queue_node_t *next;
while (1) {
tail = atomic_load(&queue->tail);
next = atomic_load(&tail->next);
if (tail == atomic_load(&queue->tail)) {
if (next == NULL) {
// 尝试将新节点链接到尾部
if (atomic_compare_exchange_weak(&tail->next, &next, node)) {
break; // 成功
}
} else {
// 尾部指针落后,尝试推进
atomic_compare_exchange_weak(&queue->tail, &tail, next);
}
}
}
// 尝试更新尾部指针
atomic_compare_exchange_weak(&queue->tail, &tail, node);
}
bool lockfree_queue_dequeue(lockfree_queue_t *queue, int *value) {
lockfree_queue_node_t *head;
lockfree_queue_node_t *tail;
lockfree_queue_node_t *next;
while (1) {
head = atomic_load(&queue->head);
tail = atomic_load(&queue->tail);
next = atomic_load(&head->next);
if (head == atomic_load(&queue->head)) {
if (head == tail) {
if (next == NULL) {
return false; // 队列为空
}
// 尾部指针落后,推进
atomic_compare_exchange_weak(&queue->tail, &tail, next);
} else {
// 读取数据
*value = next->data;
// 尝试移动头指针
if (atomic_compare_exchange_weak(&queue->head, &head, next)) {
break; // 成功
}
}
}
}
free(head);
return true;
}
// ========== 读-改-写(RMW)操作 ==========
typedef struct {
uint64_t value;
uint64_t count;
} aba_counter_t;
void aba_protection_demo() {
// ABA问题演示与解决
atomic_uintptr_t ptr = ATOMIC_VAR_INIT(0);
// 使用双字比较交换(DCAS)解决ABA问题
// 或使用带计数器的指针
struct tagged_ptr {
void *ptr;
uint64_t tag;
};
_Atomic(struct tagged_ptr) tagged;
}
// ========== 内存屏障与顺序一致性 ==========
atomic_int x = 0;
atomic_int y = 0;
int r1, r2;
void *thread_a(void *arg) {
atomic_store_explicit(&x, 1, memory_order_release);
r1 = atomic_load_explicit(&y, memory_order_acquire);
return NULL;
}
void *thread_b(void *arg) {
atomic_store_explicit(&y, 1, memory_order_release);
r2 = atomic_load_explicit(&x, memory_order_acquire);
return NULL;
}
void memory_order_demo() {
pthread_t t1, t2;
// 可能的执行结果:r1=0, r2=0(由于重排序)
pthread_create(&t1, NULL, thread_a, NULL);
pthread_create(&t2, NULL, thread_b, NULL);
pthread_join(t1, NULL);
pthread_join(t2, NULL);
// 使用更严格的内存序
atomic_store_explicit(&x, 1, memory_order_seq_cst);
r1 = atomic_load_explicit(&y, memory_order_seq_cst);
}
// ========== 无锁链表 ==========
typedef struct lockfree_list_node {
int key;
atomic_ptr(struct lockfree_list_node) next;
atomic_bool marked;
} lockfree_list_node_t;
typedef struct lockfree_list {
lockfree_list_node_t *head;
} lockfree_list_t;
bool lockfree_list_insert(lockfree_list_t *list, int key) {
lockfree_list_node_t *new_node = malloc(sizeof(lockfree_list_node_t));
if (!new_node) return false;
new_node->key = key;
atomic_init(&new_node->next, NULL);
atomic_init(&new_node->marked, false);
lockfree_list_node_t *curr = list->head;
lockfree_list_node_t *next;
while (1) {
next = atomic_load(&curr->next);
if (next == NULL || next->key > key) {
atomic_store(&new_node->next, next);
if (atomic_compare_exchange_weak(&curr->next, &next, new_node)) {
return true;
}
} else {
curr = next;
}
}
}
bool lockfree_list_delete(lockfree_list_t *list, int key) {
lockfree_list_node_t *curr = list->head;
lockfree_list_node_t *next;
while (1) {
next = atomic_load(&curr->next);
if (next == NULL) return false;
if (next->key == key) {
// 标记删除
if (!atomic_exchange(&next->marked, true)) {
// 执行删除
if (atomic_compare_exchange_weak(&curr->next, &next,
atomic_load(&next->next))) {
free(next);
return true;
}
}
}
curr = next;
}
}
第三篇:高性能网络编程
3.1 原始套接字与协议栈
/**
* 原始套接字编程
* 包括IP数据包构造、TCP SYN扫描、ICMP协议等
*/
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <sys/socket.h>
#include <netinet/ip.h>
#include <netinet/tcp.h>
#include <netinet/udp.h>
#include <netinet/ip_icmp.h>
#include <arpa/inet.h>
#include <net/if.h>
#include <sys/ioctl.h>
#include <errno.h>
// ========== IP头部结构 ==========
struct ip_header {
unsigned char ip_hl:4; // 头部长度
unsigned char ip_v:4; // 版本
unsigned char ip_tos; // 服务类型
unsigned short ip_len; // 总长度
unsigned short ip_id; // 标识
unsigned short ip_off; // 片偏移
unsigned char ip_ttl; // 生存时间
unsigned char ip_p; // 协议
unsigned short ip_sum; // 校验和
struct in_addr ip_src; // 源IP
struct in_addr ip_dst; // 目的IP
};
// ========== TCP头部结构 ==========
struct tcp_header {
unsigned short th_sport; // 源端口
unsigned short th_dport; // 目的端口
unsigned int th_seq; // 序列号
unsigned int th_ack; // 确认号
unsigned char th_x2:4; // 保留位
unsigned char th_off:4; // 数据偏移
unsigned char th_flags; // 标志位
unsigned short th_win; // 窗口大小
unsigned short th_sum; // 校验和
unsigned short th_urp; // 紧急指针
};
// ========== UDP头部结构 ==========
struct udp_header {
unsigned short uh_sport; // 源端口
unsigned short uh_dport; // 目的端口
unsigned short uh_len; // 长度
unsigned short uh_sum; // 校验和
};
// ========== ICMP头部结构 ==========
struct icmp_header {
unsigned char icmp_type; // 类型
unsigned char icmp_code; // 代码
unsigned short icmp_sum; // 校验和
unsigned short icmp_id; // 标识
unsigned short icmp_seq; // 序列号
};
// ========== 校验和计算 ==========
unsigned short checksum(void *buffer, int length) {
unsigned short *buf = buffer;
unsigned long sum = 0;
while (length > 1) {
sum += *buf++;
length -= 2;
}
if (length == 1) {
sum += *(unsigned char *)buf;
}
sum = (sum >> 16) + (sum & 0xFFFF);
sum += (sum >> 16);
return (unsigned short)(~sum);
}
// ========== TCP伪头部(用于校验和计算) ==========
struct tcp_pseudo_header {
struct in_addr src;
struct in_addr dst;
unsigned char zero;
unsigned char protocol;
unsigned short tcp_length;
};
// ========== 构造并发送TCP SYN包 ==========
int send_tcp_syn(const char *src_ip, const char *dst_ip,
unsigned short src_port, unsigned short dst_port) {
int sock;
char packet[4096];
struct ip_header *ip;
struct tcp_header *tcp;
struct sockaddr_in dest;
struct tcp_pseudo_header pseudo;
int one = 1;
// 创建原始套接字
sock = socket(AF_INET, SOCK_RAW, IPPROTO_RAW);
if (sock < 0) {
perror("socket");
return -1;
}
// 设置IP_HDRINCL,自己构造IP头
if (setsockopt(sock, IPPROTO_IP, IP_HDRINCL, &one, sizeof(one)) < 0) {
perror("setsockopt");
close(sock);
return -1;
}
memset(packet, 0, sizeof(packet));
// 构造IP头
ip = (struct ip_header *)packet;
ip->ip_v = 4;
ip->ip_hl = 5;
ip->ip_tos = 0;
ip->ip_len = htons(sizeof(struct ip_header) + sizeof(struct tcp_header));
ip->ip_id = htons(rand());
ip->ip_off = 0;
ip->ip_ttl = 64;
ip->ip_p = IPPROTO_TCP;
ip->ip_sum = 0;
ip->ip_src.s_addr = inet_addr(src_ip);
ip->ip_dst.s_addr = inet_addr(dst_ip);
// 构造TCP头
tcp = (struct tcp_header *)(packet + sizeof(struct ip_header));
tcp->th_sport = htons(src_port);
tcp->th_dport = htons(dst_port);
tcp->th_seq = htonl(rand());
tcp->th_ack = 0;
tcp->th_off = 5;
tcp->th_flags = 0x02; // SYN标志
tcp->th_win = htons(65535);
tcp->th_sum = 0;
// 计算TCP校验和(包含伪头部)
pseudo.src.s_addr = ip->ip_src.s_addr;
pseudo.dst.s_addr = ip->ip_dst.s_addr;
pseudo.zero = 0;
pseudo.protocol = IPPROTO_TCP;
pseudo.tcp_length = htons(sizeof(struct tcp_header));
// 构造校验和缓冲区
unsigned char tcp_checksum_data[sizeof(struct tcp_pseudo_header) +
sizeof(struct tcp_header)];
memcpy(tcp_checksum_data, &pseudo, sizeof(struct tcp_pseudo_header));
memcpy(tcp_checksum_data + sizeof(struct tcp_pseudo_header),
tcp, sizeof(struct tcp_header));
tcp->th_sum = checksum(tcp_checksum_data,
sizeof(struct tcp_pseudo_header) + sizeof(struct tcp_header));
// 计算IP校验和
ip->ip_sum = checksum(packet, ip->ip_len);
// 发送数据包
dest.sin_family = AF_INET;
dest.sin_addr.s_addr = ip->ip_dst.s_addr;
if (sendto(sock, packet, ntohs(ip->ip_len), 0,
(struct sockaddr *)&dest, sizeof(dest)) < 0) {
perror("sendto");
close(sock);
return -1;
}
close(sock);
return 0;
}
// ========== TCP端口扫描(SYN扫描) ==========
typedef struct {
char ip[16];
unsigned short port;
int open;
} scan_result_t;
void *syn_scan_worker(void *arg) {
scan_result_t *result = (scan_result_t *)arg;
// 构造伪源IP
char src_ip[16];
sprintf(src_ip, "192.168.1.%d", rand() % 254 + 1);
result->open = 0;
if (send_tcp_syn(src_ip, result->ip,
rand() % 65535 + 1024, result->port) == 0) {
// 成功发送,端口是否开放需要接收响应(此处简化)
result->open = 1;
}
return NULL;
}
void syn_port_scan(const char *target_ip, unsigned short start_port,
unsigned short end_port) {
pthread_t threads[65536];
scan_result_t results[65536];
int port;
int thread_count = 0;
printf("Scanning %s from port %d to %d\n", target_ip, start_port, end_port);
for (port = start_port; port <= end_port; port++) {
strcpy(results[port].ip, target_ip);
results[port].port = port;
pthread_create(&threads[thread_count], NULL, syn_scan_worker,
&results[port]);
thread_count++;
// 限制并发数
if (thread_count >= 1000) {
for (int i = 0; i < thread_count; i++) {
pthread_join(threads[i], NULL);
}
thread_count = 0;
}
}
// 等待剩余线程
for (int i = 0; i < thread_count; i++) {
pthread_join(threads[i], NULL);
}
// 输出结果
printf("\nOpen ports:\n");
for (port = start_port; port <= end_port; port++) {
if (results[port].open) {
printf("Port %d is open\n", port);
}
}
}
// ========== ICMP Ping实现 ==========
int send_icmp_echo(const char *dest_ip) {
int sock;
char packet[4096];
struct ip_header *ip;
struct icmp_header *icmp;
struct sockaddr_in dest;
int one = 1;
sock = socket(AF_INET, SOCK_RAW, IPPROTO_ICMP);
if (sock < 0) {
perror("socket");
return -1;
}
memset(packet, 0, sizeof(packet));
// 构造ICMP头(不需要IP头,系统会自动添加)
icmp = (struct icmp_header *)packet;
icmp->icmp_type = 8; // ICMP Echo Request
icmp->icmp_code = 0;
icmp->icmp_id = htons(getpid());
icmp->icmp_seq = htons(1);
icmp->icmp_sum = 0;
// 添加数据
char *data = packet + sizeof(struct icmp_header);
memset(data, 'A', 56);
// 计算校验和
icmp->icmp_sum = checksum(packet, sizeof(struct icmp_header) + 56);
dest.sin_family = AF_INET;
dest.sin_addr.s_addr = inet_addr(dest_ip);
if (sendto(sock, packet, sizeof(struct icmp_header) + 56, 0,
(struct sockaddr *)&dest, sizeof(dest)) < 0) {
perror("sendto");
close(sock);
return -1;
}
// 接收响应
struct sockaddr_in from;
socklen_t fromlen = sizeof(from);
struct timeval tv;
tv.tv_sec = 2;
tv.tv_usec = 0;
setsockopt(sock, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv));
char recv_buffer[4096];
int n = recvfrom(sock, recv_buffer, sizeof(recv_buffer), 0,
(struct sockaddr *)&from, &fromlen);
close(sock);
if (n > 0) {
struct ip_header *recv_ip = (struct ip_header *)recv_buffer;
struct icmp_header *recv_icmp = (struct icmp_header *)(recv_buffer +
(recv_ip->ip_hl * 4));
if (recv_icmp->icmp_type == 0) { // Echo Reply
printf("Reply from %s: icmp_seq=%d time=%dms\n",
dest_ip, ntohs(recv_icmp->icmp_seq), 0);
return 0;
}
}
printf("Request timeout for %s\n", dest_ip);
return -1;
}
3.2 epoll/IOCP高性能服务器
/**
* 基于epoll的高性能TCP服务器
* 支持非阻塞IO、边缘触发、多线程事件循环
*/
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <fcntl.h>
#include <errno.h>
#include <sys/epoll.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <pthread.h>
#define MAX_EVENTS 10240
#define BUFFER_SIZE 8192
#define THREAD_COUNT 4
// ========== 连接结构体 ==========
typedef struct connection {
int fd;
char read_buffer[BUFFER_SIZE];
char write_buffer[BUFFER_SIZE];
size_t read_len;
size_t write_len;
size_t write_sent;
int status; // 0: reading, 1: writing
struct connection *next;
} connection_t;
// ========== 连接池 ==========
typedef struct connection_pool {
connection_t *free_list;
connection_t *all_connections;
int count;
pthread_mutex_t lock;
} connection_pool_t;
connection_pool_t *create_connection_pool(int max_connections) {
connection_pool_t *pool = malloc(sizeof(connection_pool_t));
if (!pool) return NULL;
pool->all_connections = malloc(max_connections * sizeof(connection_t));
if (!pool->all_connections) {
free(pool);
return NULL;
}
pool->free_list = NULL;
pool->count = max_connections;
pthread_mutex_init(&pool->lock, NULL);
// 初始化连接池
for (int i = 0; i < max_connections; i++) {
connection_t *conn = &pool->all_connections[i];
conn->fd = -1;
conn->next = pool->free_list;
pool->free_list = conn;
}
return pool;
}
connection_t *connection_pool_acquire(connection_pool_t *pool) {
connection_t *conn = NULL;
pthread_mutex_lock(&pool->lock);
if (pool->free_list) {
conn = pool->free_list;
pool->free_list = conn->next;
conn->next = NULL;
}
pthread_mutex_unlock(&pool->lock);
if (conn) {
memset(conn->read_buffer, 0, BUFFER_SIZE);
memset(conn->write_buffer, 0, BUFFER_SIZE);
conn->read_len = 0;
conn->write_len = 0;
conn->write_sent = 0;
conn->status = 0;
}
return conn;
}
void connection_pool_release(connection_pool_t *pool, connection_t *conn) {
if (!conn) return;
if (conn->fd != -1) {
close(conn->fd);
conn->fd = -1;
}
pthread_mutex_lock(&pool->lock);
conn->next = pool->free_list;
pool->free_list = conn;
pthread_mutex_unlock(&pool->lock);
}
void destroy_connection_pool(connection_pool_t *pool) {
if (pool) {
pthread_mutex_destroy(&pool->lock);
free(pool->all_connections);
free(pool);
}
}
// ========== 设置非阻塞 ==========
int set_nonblocking(int fd) {
int flags = fcntl(fd, F_GETFL, 0);
if (flags == -1) return -1;
return fcntl(fd, F_SETFL, flags | O_NONBLOCK);
}
// ========== 创建监听socket ==========
int create_listen_socket(int port) {
int listen_fd;
struct sockaddr_in addr;
int reuse = 1;
listen_fd = socket(AF_INET, SOCK_STREAM, 0);
if (listen_fd < 0) {
perror("socket");
return -1;
}
// 设置端口重用
if (setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR,
&reuse, sizeof(reuse)) < 0) {
perror("setsockopt");
close(listen_fd);
return -1;
}
addr.sin_family = AF_INET;
addr.sin_addr.s_addr = INADDR_ANY;
addr.sin_port = htons(port);
if (bind(listen_fd, (struct sockaddr *)&addr, sizeof(addr)) < 0) {
perror("bind");
close(listen_fd);
return -1;
}
if (listen(listen_fd, SOMAXCONN) < 0) {
perror("listen");
close(listen_fd);
return -1;
}
set_nonblocking(listen_fd);
return listen_fd;
}
// ========== 处理HTTP请求 ==========
void handle_http_request(connection_t *conn) {
// 简单回显HTTP响应
const char *response = "HTTP/1.1 200 OK\r\n"
"Content-Type: text/html\r\n"
"Content-Length: 50\r\n"
"\r\n"
"<html><body><h1>Hello, World!</h1></body></html>";
strcpy(conn->write_buffer, response);
conn->write_len = strlen(response);
conn->write_sent = 0;
conn->status = 1; // 切换到写状态
}
// ========== 读取数据 ==========
int handle_read(connection_t *conn, int epoll_fd) {
int n;
while (1) {
n = read(conn->fd, conn->read_buffer + conn->read_len,
BUFFER_SIZE - conn->read_len);
if (n == -1) {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
break; // 数据读取完毕
}
return -1; // 错误
} else if (n == 0) {
return -1; // 连接关闭
}
conn->read_len += n;
// 检查是否收到完整请求(简单判断:收到\r\n\r\n)
if (conn->read_len >= 4 &&
memcmp(conn->read_buffer + conn->read_len - 4, "\r\n\r\n", 4) == 0) {
handle_http_request(conn);
// 修改epoll事件为写事件
struct epoll_event ev;
ev.events = EPOLLOUT | EPOLLET;
ev.data.ptr = conn;
epoll_ctl(epoll_fd, EPOLL_CTL_MOD, conn->fd, &ev);
return 0;
}
}
return 0;
}
// ========== 写入数据 ==========
int handle_write(connection_t *conn, int epoll_fd) {
int n;
while (conn->write_sent < conn->write_len) {
n = write(conn->fd, conn->write_buffer + conn->write_sent,
conn->write_len - conn->write_sent);
if (n == -1) {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
return 0; // 等待下次写事件
}
return -1;
}
conn->write_sent += n;
}
// 数据发送完毕,关闭连接
return -1;
}
// ========== 接受新连接 ==========
int handle_accept(int listen_fd, int epoll_fd, connection_pool_t *pool) {
struct sockaddr_in client_addr;
socklen_t client_len = sizeof(client_addr);
int client_fd;
connection_t *conn;
while (1) {
client_fd = accept(listen_fd, (struct sockaddr *)&client_addr, &client_len);
if (client_fd == -1) {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
return 0; // 无更多连接
}
perror("accept");
return -1;
}
// 获取连接对象
conn = connection_pool_acquire(pool);
if (!conn) {
close(client_fd);
continue;
}
conn->fd = client_fd;
set_nonblocking(client_fd);
// 注册到epoll
struct epoll_event ev;
ev.events = EPOLLIN | EPOLLET | EPOLLRDHUP;
ev.data.ptr = conn;
epoll_ctl(epoll_fd, EPOLL_CTL_ADD, client_fd, &ev);
printf("New connection from %s:%d\n",
inet_ntoa(client_addr.sin_addr), ntohs(client_addr.sin_port));
}
}
// ========== 工作线程 ==========
typedef struct worker_thread_data {
int epoll_fd;
connection_pool_t *pool;
} worker_thread_data_t;
void *worker_thread(void *arg) {
worker_thread_data_t *data = (worker_thread_data_t *)arg;
struct epoll_event events[MAX_EVENTS];
while (1) {
int nfds = epoll_wait(data->epoll_fd, events, MAX_EVENTS, -1);
if (nfds == -1) {
if (errno == EINTR) continue;
perror("epoll_wait");
break;
}
for (int i = 0; i < nfds; i++) {
if (events[i].data.fd == data->epoll_fd) {
// 监听socket
handle_accept(events[i].data.fd, data->epoll_fd, data->pool);
} else {
connection_t *conn = (connection_t *)events[i].data.ptr;
int ret = 0;
if (events[i].events & EPOLLIN) {
ret = handle_read(conn, data->epoll_fd);
} else if (events[i].events & EPOLLOUT) {
ret = handle_write(conn, data->epoll_fd);
}
if (ret == -1) {
// 连接关闭
epoll_ctl(data->epoll_fd, EPOLL_CTL_DEL, conn->fd, NULL);
connection_pool_release(data->pool, conn);
}
}
}
}
return NULL;
}
// ========== 多线程epoll服务器 ==========
void run_multi_thread_epoll_server(int port) {
int listen_fd = create_listen_socket(port);
if (listen_fd < 0) {
exit(EXIT_FAILURE);
}
connection_pool_t *pool = create_connection_pool(10000);
if (!pool) {
close(listen_fd);
exit(EXIT_FAILURE);
}
pthread_t threads[THREAD_COUNT];
worker_thread_data_t thread_data[THREAD_COUNT];
for (int i = 0; i < THREAD_COUNT; i++) {
// 每个线程创建独立的epoll实例
int epoll_fd = epoll_create1(0);
if (epoll_fd < 0) {
perror("epoll_create1");
continue;
}
// 将监听socket加入epoll
struct epoll_event ev;
ev.events = EPOLLIN;
ev.data.fd = listen_fd;
epoll_ctl(epoll_fd, EPOLL_CTL_ADD, listen_fd, &ev);
thread_data[i].epoll_fd = epoll_fd;
thread_data[i].pool = pool;
pthread_create(&threads[i], NULL, worker_thread, &thread_data[i]);
}
printf("Server running on port %d with %d threads\n", port, THREAD_COUNT);
for (int i = 0; i < THREAD_COUNT; i++) {
pthread_join(threads[i], NULL);
close(thread_data[i].epoll_fd);
}
destroy_connection_pool(pool);
close(listen_fd);
}