Chapter 29Threads And Atomics

线程与原子操作

概述

前一章的文件系统管道为并行生产和消费数据的应用奠定了基础。现在我们关注Zig如何启动OS线程、协调跨核心的工作,并通过原子操作保持共享状态的一致性(参见28Thread.zig)。

Zig 0.15.2的线程原语将轻量级spawn API与显式内存顺序结合,因此你可以决定存储何时可见以及何时应该阻塞以处理竞争。理解这些工具将使即将到来的并行词频统计项目不再神秘(参见atomic.zig30)。

学习目标

  • 负责任地生成和连接工作线程,仅在必要时选择栈大小和分配器。
  • 在为保护共享状态选择原子加载、存储和比较-交换循环的内存顺序。
  • 在编译时检测单线程构建并回退到同步执行路径。

使用协调工作

Zig通过std.Thread建模内核线程,提供辅助函数来查询CPU数量、配置栈大小,并确定性连接句柄。与异步I/O不同,这些是真正的内核线程——每次spawn都会消耗OS资源,因此批处理工作单元很重要。

线程池模式

在深入研究手动线程生成之前,了解Zig编译器自身用于并行工作的线程池模式很有价值。以下图表显示std.Thread.Pool如何跨工作器分配工作:

graph TB ThreadPool["ThreadPool<br/>(std.Thread.Pool)"] AstGen1["AstGen<br/>(File 1)"] AstGen2["AstGen<br/>(File 2)"] AstGen3["AstGen<br/>(File 3)"] Sema1["Sema<br/>(Function 1)"] Sema2["Sema<br/>(Function 2)"] Codegen1["CodeGen<br/>(Function 1)"] Codegen2["CodeGen<br/>(Function 2)"] ThreadPool --> AstGen1 ThreadPool --> AstGen2 ThreadPool --> AstGen3 ThreadPool --> Sema1 ThreadPool --> Sema2 ThreadPool --> Codegen1 ThreadPool --> Codegen2 ZcuPerThread["Zcu.PerThread<br/>(per-thread state)"] Sema1 -.->|"uses"| ZcuPerThread Sema2 -.->|"uses"| ZcuPerThread

线程池维护固定数量的工作线程,这些线程从队列中拉取工作项,避免重复生成和连接线程的开销。Zig编译器广泛使用此模式:std.Thread.Pool将AST生成、语义分析和代码生成任务分派给工作器。每个工作器都有每线程状态(Zcu.PerThread)以最小化同步——只有最终结果需要互斥保护,才能合并到InternPool.shards等共享数据结构中。这种架构展示了关键的并发设计原则:工作单元应该是独立的,共享状态应该分片或由互斥保护,每线程缓存减少争用。当你的工作负载涉及许多小任务时,优先选择std.Thread.Pool而不是手动spawn;当需要具有特定职责的少数长时间运行的工作器时,手动spawn/join是合适的。

使用spawn/join分块数据

下面的示例将整数数组分区到动态数量的工作器中,使用原子fetch-add来累积偶数总数而不使用锁。它适应主机CPU数量,但绝不生成超过待处理元素数量的线程。

Zig

// 该示例演示在Zig中使用线程和原子操作进行并行计算
// 它通过在多个线程之间分配工作来计算数组中偶数的和
const std = @import("std");

// 传递给每个工作线程进行并行处理的参数
const WorkerArgs = struct {
    slice: []const u64,                  // 该工作线程应处理的数字子集
    sum: *std.atomic.Value(u64),         // 用于线程安全累加的共享原子计数器
};

// 从其分配的分片中累加偶数的工作函数
// 每个线程在自己的数据分区上独立运行此函数
fn accumulate(args: WorkerArgs) void {
    // 使用局部变量以最小化原子操作(性能优化)
    var local_total: u64 = 0;
    for (args.slice) |value| {
        if (value % 2 == 0) {
            local_total += value;
        }
    }

    // Atomically add the local result to the shared sum using sequentially consistent ordering
    // This ensures all threads see a consistent view of the shared state
    _ = args.sum.fetchAdd(local_total, .seq_cst);
}

pub fn main() !void {
    // Set up memory allocator with automatic leak detection
    var gpa = std.heap.GeneralPurposeAllocator(.{}){};
    defer _ = gpa.deinit();
    const allocator = gpa.allocator();

    // Allocate array of 64 numbers for demonstration
    var numbers = try allocator.alloc(u64, 64);
    defer allocator.free(numbers);

    // Initialize array with values following the pattern: index * 7 + 3
    for (numbers, 0..) |*slot, index| {
        slot.* = @as(u64, @intCast(index * 7 + 3));
    }

    // Initialize shared atomic counter that all threads will safely update
    var shared_sum = std.atomic.Value(u64).init(0);

    // Determine optimal number of worker threads based on available CPU cores
    const cpu_count = std.Thread.getCpuCount() catch 1;
    const desired = if (cpu_count == 0) 1 else cpu_count;
    // Don't create more threads than we have numbers to process
    const worker_limit = @min(numbers.len, desired);

    // Allocate thread handles for parallel workers
    var threads = try allocator.alloc(std.Thread, worker_limit);
    defer allocator.free(threads);

    // Calculate chunk size, rounding up to ensure all elements are covered
    const chunk = (numbers.len + worker_limit - 1) / worker_limit;

    // Spawn worker threads, distributing the array into roughly equal chunks
    var start: usize = 0;
    var spawned: usize = 0;
    while (start < numbers.len and spawned < worker_limit) : (spawned += 1) {
        const remaining = numbers.len - start;
        // Give the last thread all remaining elements to handle uneven divisions
        const take = if (worker_limit - spawned == 1) remaining else @min(chunk, remaining);
        const end = start + take;

        // Spawn thread with its assigned slice and shared accumulator
        threads[spawned] = try std.Thread.spawn(.{}, accumulate, .{WorkerArgs{
            .slice = numbers[start..end],
            .sum = &shared_sum,
        }});

        start = end;
    }

    // Track how many threads were actually spawned (may be less than worker_limit)
    const used_threads = spawned;

    // Wait for all worker threads to complete their work
    for (threads[0..used_threads]) |thread| {
        thread.join();
    }

    // Read the final accumulated result from the atomic shared sum
    const even_sum = shared_sum.load(.seq_cst);

    // Perform sequential calculation to verify correctness of parallel computation
    var sequential: u64 = 0;
    for (numbers) |value| {
        if (value % 2 == 0) {
            sequential += value;
        }
    }

    // Set up buffered stdout writer for efficient output
    var stdout_buffer: [256]u8 = undefined;
    var stdout_state = std.fs.File.stdout().writer(&stdout_buffer);
    const out = &stdout_state.interface;

    // Display results: thread count and both parallel and sequential sums
    try out.print("spawned {d} worker(s)\n", .{used_threads});
    try out.print("even sum (threads): {d}\n", .{even_sum});
    try out.print("even sum (sequential check): {d}\n", .{sequential});
    try out.flush();
}
运行
Shell
$ zig run 01_parallel_even_sum.zig
输出
Shell
spawned 8 worker(s)
even sum (threads): 7264
even sum (sequential check): 7264

std.atomic.Value包装普通整数并通过@atomicLoad@atomicStore@atomicRmw路由每个访问,保护你不会意外地将原子和非原子访问混合到同一内存位置。

生成配置和调度提示

std.Thread.SpawnConfig允许你覆盖栈大小或在默认不合适的情况下提供自定义分配器(例如,深度递归或预分配的arena)。捕获Thread.getCpuCount()错误以提供安全回退,并记住在需要协作调度同时等待其他线程进行时使用Thread.yield()Thread.sleep()

原子状态机

Zig直接公开LLVM的原子内在函数:你选择像.acquire.release.seq_cst这样的顺序,编译器发出匹配的栅栏。当设计多个线程必须一致观察的小状态机(如一次性初始化器)时,这种清晰性很有价值。

使用原子内置函数实现一次性守护

此程序围绕@cmpxchgStrong构建无锁的"调用一次"辅助函数。线程仅在另一个线程运行初始化器时自旋,然后通过acquire load读取发布值。

Zig

// This example demonstrates thread-safe one-time initialization using atomic operations.
// Multiple threads attempt to initialize a shared resource, but only one succeeds in
// performing the expensive initialization exactly once.

const std = @import("std");

// Represents the initialization state using atomic operations
const State = enum(u8) { idle, busy, ready };

// Global state tracking the initialization lifecycle
var once_state: State = .idle;
// The shared configuration value that will be initialized once
var config_value: i32 = 0;
// Counter to verify that initialization only happens once
var init_calls: u32 = 0;

// Simulates an expensive initialization operation that should only run once.
// Uses atomic operations to safely increment the call counter and set the config value.
fn expensiveInit() void {
    // Simulate expensive work with a sleep
    std.Thread.sleep(2 * std.time.ns_per_ms);
    // Atomically increment the initialization call counter
    _ = @atomicRmw(u32, &init_calls, .Add, 1, .seq_cst);
    // Atomically store the initialized value with release semantics
    @atomicStore(i32, &config_value, 9157, .release);
}

// Ensures expensiveInit() is called exactly once across multiple threads.
// Uses a state machine with compare-and-swap to coordinate thread access.
fn callOnce() void {
    while (true) {
        // Check the current state with acquire semantics to see initialization results
        switch (@atomicLoad(State, &once_state, .acquire)) {
            // Initialization complete, return immediately
            .ready => return,
            // Another thread is initializing, yield and retry
            .busy => {
                std.Thread.yield() catch {};
                continue;
            },
            // Not yet initialized, attempt to claim initialization responsibility
            .idle => {
                // Try to atomically transition from idle to busy
                // If successful (returns null), this thread wins and will initialize
                // If it fails (returns the actual value), another thread won, so retry
                if (@cmpxchgStrong(State, &once_state, .idle, .busy, .acq_rel, .acquire)) |_| {
                    continue;
                }
                // This thread successfully claimed the initialization
                break;
            },
        }
    }

    // Perform the one-time initialization
    expensiveInit();
    // Mark initialization as complete with release semantics
    @atomicStore(State, &once_state, .ready, .release);
}

// Arguments passed to each worker thread
const WorkerArgs = struct {
    results: []i32,
    index: usize,
};

// Worker thread function that calls the once-initialization and reads the result.
fn worker(args: WorkerArgs) void {
    // Ensure initialization happens (blocks until complete if another thread is initializing)
    callOnce();
    // Read the initialized value with acquire semantics
    const value = @atomicLoad(i32, &config_value, .acquire);
    // Store the observed value in the thread's result slot
    args.results[args.index] = value;
}

pub fn main() !void {
    // Reset global state for demonstration
    once_state = .idle;
    config_value = 0;
    init_calls = 0;

    // Set up memory allocation
    var gpa = std.heap.GeneralPurposeAllocator(.{}){};
    defer _ = gpa.deinit();
    const allocator = gpa.allocator();

    const worker_count: usize = 4;

    // Allocate array to collect results from each thread
    const results = try allocator.alloc(i32, worker_count);
    defer allocator.free(results);
    // Initialize all result slots to -1 to detect if any thread fails
    for (results) |*slot| slot.* = -1;

    // Allocate array to hold thread handles
    const threads = try allocator.alloc(std.Thread, worker_count);
    defer allocator.free(threads);

    // Spawn all worker threads
    for (threads, 0..) |*thread, index| {
        thread.* = try std.Thread.spawn(.{}, worker, .{WorkerArgs{
            .results = results,
            .index = index,
        }});
    }

    // Wait for all threads to complete
    for (threads) |thread| {
        thread.join();
    }

    // Read final values after all threads complete
    const final_value = @atomicLoad(i32, &config_value, .acquire);
    const called = @atomicLoad(u32, &init_calls, .seq_cst);

    // Set up buffered output
    var stdout_buffer: [256]u8 = undefined;
    var stdout_state = std.fs.File.stdout().writer(&stdout_buffer);
    const out = &stdout_state.interface;

    // Print the value observed by each thread (should all be 9157)
    for (results, 0..) |value, index| {
        try out.print("thread {d} observed {d}\n", .{ index, value });
    }
    // Verify initialization was called exactly once
    try out.print("init calls: {d}\n", .{called});
    // Display the final configuration value
    try out.print("config value: {d}\n", .{final_value});
    try out.flush();
}
运行
Shell
$ zig run 02_atomic_once.zig
输出
Shell
thread 0 observed 9157
thread 1 observed 9157
thread 2 observed 9157
thread 3 observed 9157
init calls: 1
config value: 9157

@cmpxchgStrong在成功时返回null,因此当它产生值时循环是一种简洁的重试CAS方式,无需分配互斥。将最终的@atomicStore.release配对,以在任何执行.acquireload的等待者之前发布结果。

单线程构建和回退

传递-Dsingle-threaded=true强制编译器拒绝任何生成OS线程的尝试。可能同时运行在两种配置中的代码应该在编译时在builtin.single_threaded上分支,并替换内联执行路径。参见builtin.zig

理解单线程标志

single_threaded标志是编译器功能配置系统的一部分,影响代码生成和优化:

graph TB subgraph "Code Generation Features" Features["Feature Flags"] Features --> UnwindTables["unwind_tables: bool"] Features --> StackProtector["stack_protector: bool"] Features --> StackCheck["stack_check: bool"] Features --> RedZone["red_zone: ?bool"] Features --> OmitFramePointer["omit_frame_pointer: bool"] Features --> Valgrind["valgrind: bool"] Features --> SingleThreaded["single_threaded: bool"] UnwindTables --> EHFrame["Generate .eh_frame<br/>for exception handling"] StackProtector --> CanaryCheck["Stack canary checks<br/>buffer overflow detection"] StackCheck --> ProbeStack["Stack probing<br/>prevents overflow"] RedZone --> RedZoneSpace["Red zone optimization<br/>(x86_64, AArch64)"] OmitFramePointer --> NoFP["Omit frame pointer<br/>for performance"] Valgrind --> ValgrindSupport["Valgrind client requests<br/>for memory debugging"] SingleThreaded --> NoThreading["Assume single-threaded<br/>enable optimizations"] end

single_threaded为true时,编译器假设没有对内存的并发访问,启用几种优化:原子操作可以降级为普通加载和存储(消除栅栏指令),线程本地存储变为常规全局变量,同步原语可以完全省略。此标志通过构建时的-Dsingle-threaded=true设置,并通过Compilation.Config流入代码生成。重要的是,这不仅仅是API限制——它从根本上改变了生成的代码。在单线程模式下编译的原子操作比多线程构建中的原子操作具有更弱的保证,因此你必须确保代码路径在两种模式下保持一致,以避免在切换标志时出现细微错误。

在编译时限制线程使用

下面的守护重置原子状态机,然后基于构建模式生成工作器或内联执行任务。因为分支是编译时的,单线程配置永远不会实例化Thread.spawn,完全避免编译错误。

Zig
const std = @import("std");
const builtin = @import("builtin");

// Enum representing the possible states of task execution
// Uses explicit u8 backing to ensure consistent size across platforms
const TaskState = enum(u8) { idle, threaded_done, inline_done };

// Global atomic state tracking whether task ran inline or in a separate thread
// Atomics ensure thread-safe access even though single-threaded builds won't spawn threads
var task_state = std.atomic.Value(TaskState).init(.idle);

// Simulates a task that runs in a separate thread
// Includes a small delay to demonstrate asynchronous execution
fn threadedTask() void {
    std.Thread.sleep(1 * std.time.ns_per_ms);
    // Release ordering ensures all prior writes are visible to threads that acquire this value
    task_state.store(.threaded_done, .release);
}

// Simulates a task that runs inline in the main thread
// Used as fallback when threading is disabled at compile time
fn inlineTask() void {
    // Release ordering maintains consistency with the threaded path
    task_state.store(.inline_done, .release);
}

pub fn main() !void {
    // Set up buffered stdout writer for efficient output
    var stdout_buffer: [256]u8 = undefined;
    var stdout_state = std.fs.File.stdout().writer(&stdout_buffer);
    const out = &stdout_state.interface;

    // Reset state to idle with sequential consistency
    // seq_cst provides strongest ordering guarantees for initialization
    task_state.store(.idle, .seq_cst);

    // Check compile-time flag to determine execution strategy
    // builtin.single_threaded is true when compiled with -fsingle-threaded
    if (builtin.single_threaded) {
        try out.print("single-threaded build; running task inline\n", .{});
        // Execute task directly without spawning a thread
        inlineTask();
    } else {
        try out.print("multi-threaded build; spawning worker\n", .{});
        // Spawn separate thread to execute task concurrently
        var worker = try std.Thread.spawn(.{}, threadedTask, .{});
        // Block until worker thread completes
        worker.join();
    }

    // Acquire ordering ensures we observe all writes made before the release store
    const final_state = task_state.load(.acquire);
    
    // Convert enum state to human-readable string for output
    const label = switch (final_state) {
        .idle => "idle",
        .threaded_done => "threaded_done",
        .inline_done => "inline_done",
    };

    // Display final execution state and flush buffer to ensure output is visible
    try out.print("task state: {s}\n", .{label});
    try out.flush();
}
运行
Shell
$ zig run 03_single_thread_guard.zig
输出
Shell
multi-threaded build; spawning worker
task state: threaded_done

当你使用-Dsingle-threaded=true构建时,内联分支是唯一编译的分支,因此保持逻辑对称,并确保任何共享状态仍通过相同的原子辅助函数设置,以避免语义发散。

注意事项与限制

  • 线程必须恰好连接或分离一次;泄漏句柄导致资源耗尽。Thread.join消费句柄,因此将其存储在你可以稍后迭代的切片中。
  • 原子操作在原始内存上运行——绝不要混合对同一位置的原子和非原子访问,即使你"知道"竞争不会发生。将共享标量包装在std.atomic.Value中以保持你的意图明显。
  • 比较-交换循环可能会活自旋;当等待可能持续超过几个周期时,考虑Thread.yield()或事件原语如Thread.ResetEvent

使用ThreadSanitizer调试并发代码

Zig通过ThreadSanitizer提供内置的竞态检测,这是一个用于查找数据竞争、死锁和其他并发错误的强大工具:

清理器配置字段用途要求
线程清理器any_sanitize_thread数据竞争检测LLVM后端
UBSanany_sanitize_cC未定义行为LLVM后端,C代码
Fuzzingany_fuzzFuzzing插桩libfuzzer集成
graph TB subgraph "Sanitizer Configuration" Sanitizers["Sanitizer Flags"] Sanitizers --> TSan["any_sanitize_thread"] Sanitizers --> UBSan["any_sanitize_c"] Sanitizers --> Fuzz["any_fuzz"] TSan --> TSanLib["tsan_lib: ?CrtFile"] TSan --> TSanRuntime["ThreadSanitizer runtime<br/>linked into binary"] UBSan --> UBSanLib["ubsan_rt_lib: ?CrtFile<br/>ubsan_rt_obj: ?CrtFile"] UBSan --> UBSanRuntime["UBSan runtime<br/>C undefined behavior checks"] Fuzz --> FuzzLib["fuzzer_lib: ?CrtFile"] Fuzz --> FuzzRuntime["libFuzzer integration<br/>for fuzz testing"] end

在构建程序时使用-Dsanitize-thread启用ThreadSanitizer。TSan插桩所有内存访问和同步操作,跟踪happens-before关系以检测竞争。当检测到竞争时,TSan打印显示冲突访问及其堆栈跟踪的详细报告。插桩增加显著的运行时开销(2-5倍减速,5-10倍内存使用),因此在开发和测试期间使用它,而不是在生产中。TSan对于验证原子代码特别有价值:即使你的逻辑看起来正确,TSan也能捕捉细微的排序问题或缺失的同步。对于本章的示例,尝试使用-Dsanitize-thread运行它们以验证它们没有竞态——并行求和和原子一次性模式应该干净地通过,证明正确的同步。

练习

  • 扩展并行求和以接受谓词回调,以便你可以将"偶数"交换为你喜欢的任何分类;测量.acquire.monotonic加载对竞争的影响。
  • 重新设计callOnce演示以分阶段错误:让初始化器返回!void并将失败存储在原子槽中,以便调用者可以一致地重新抛出相同错误。
  • 在一次性守护代码周围引入std.Thread.WaitGroup,以便你可以等待任意数量的工作线程,而无需手动存储句柄。

限制、替代方案和边缘案例

  • 在没有pthreads或Win32线程的平台上,Zig发出编译错误;在针对没有--threading支持的WASI时,计划回退到事件循环或async。
  • 原子操作在普通整数和枚举上运行;对于复合状态,考虑使用互斥或设计原子数组以避免撕裂更新。
  • 单线程构建仍然可以使用原子,但指令编译为普通加载/存储。保持代码路径一致,这样你就不会意外地依赖多线程构建中更强的排序。

平台特定线程约束

并非所有平台都支持线程,有些对线程本地存储有特殊要求:

graph TB subgraph "Threading Configuration" TARG["Target Platform"] TARG --> SINGLETHREAD["defaultSingleThreaded()<br/>WASM, Haiku"] TARG --> EMULATETLS["useEmulatedTls()<br/>OpenBSD, old Android"] SINGLETHREAD --> NOTHREAD["No thread support"] EMULATETLS --> TLSEMU["Software TLS"] end

某些目标默认进入单线程模式,因为它们缺乏OS线程支持:WebAssembly(没有--threading标志)和Haiku OS都属于这一类别。在这些平台上,除非你在构建配置中明确启用线程支持,否则尝试生成线程会导致编译错误。相关问题是线程本地存储(TLS):OpenBSD和较旧的Android版本不提供原生TLS,因此Zig使用模拟TLS——一种更慢但可移植的软件实现。在编写跨平台并发代码时,检查target.defaultSingleThreaded()target.useEmulatedTls()以了解平台约束。对于WASI,你可以通过atomicsbulk-memory功能以及--import-memory --shared-memory链接器标志启用线程,但不是所有WASI运行时都支持此功能。为你的代码设计优雅降级:使用builtin.single_threaded提供同步回退,并避免假设TLS在所有平台上都是零成本。

总结

  • std.Thread提供轻量级spawn/join语义,但你仍负责调度和清理。
  • @atomicLoad@atomicStore@cmpxchgStrong这样的原子内在函数使小无锁状态机在你将顺序与你的不变量匹配时变得实用。
  • 使用builtin.single_threaded使共享组件在单线程构建和多核部署中工作,而无需拆分代码库。

Help make this chapter better.

Found a typo, rough edge, or missing explanation? Open an issue or propose a small improvement on GitHub.