概述
有了前一章的并发原语,我们将构建一个小而实用的工具:一个并行词频计数器,它可以读取文件,将其分片为沿空白字符的连续段,启动工作线程进行标记化和统计,然后将线程本地映射合并为最终频率表。参见29、Thread.zig和atomic.zig。
为什么选择这个项目?它锻炼了常见的系统模式——工作分解、避免错误共享、字符串键的所有权和内存顺序约束——而不淹没在样板代码中。结果是一个你可以适应的健壮骨架,用于日志压缩、类grep索引或轻量级分析。
学习目标
- 在尊重标记边界的同时将输入分区为分片。
- 安全使用
std.Thread.spawn,并在单线程构建中回退到内联执行。 - 维护每线程
std.StringHashMap实例并在不悬挂指针的情况下合并它们。 - 通过对键/值对向量排序来呈现确定性的"Top N"。
项目布局和构建
我们将此示例保持为带有本地构建的小包。0.15.2构建API构造显式模块并将其传递给addExecutable——注意root_module字段而不是旧的root_source_file。
const std = @import("std");
/// Build script for the parallel wordcount project.
/// Configures and compiles the executable with standard build options.
pub fn build(b: *std.Build) void {
// Parse target triple from command line (--target flag)
const target = b.standardTargetOptions(.{});
// Parse optimization level from command line (-Doptimize flag)
const optimize = b.standardOptimizeOption(.{});
// Create a module representing our application's entry point.
// In Zig 0.15.2, modules are explicitly created before being passed to executables.
const root = b.createModule(.{
.root_source_file = b.path("src/main.zig"),
.target = target,
.optimize = optimize,
});
// Define the executable artifact, linking it to the root module.
const exe = b.addExecutable(.{
.name = "parallel-wc",
.root_module = root,
});
// Register the executable to be installed in zig-out/bin
b.installArtifact(exe);
// Create a run command that executes the compiled binary
const run_cmd = b.addRunArtifact(exe);
// Forward any arguments passed after '--' to the executable
if (b.args) |args| run_cmd.addArgs(args);
// Define a 'run' step that users can invoke with 'zig build run'
const run_step = b.step("run", "Run parallel wordcount");
run_step.dependOn(&run_cmd.step);
}
参见Build.zig。
实现
程序将整个文件读入内存(受合理上限限制),在空白处创建分片边界,然后启动N个工作器(N=CPU数量,除非在单线程模式下)。每个工作器标记化并将ASCII小写,剥离标点符号,并插入自己的映射中,该映射由arena支持以避免每个标记释放。在合并时,我们将键复制到最终映射的分配器中,以便析构arena不会使键无效。
const std = @import("std");
const builtin = @import("builtin");
/// Map type alias: word → frequency count
const Map = std.StringHashMap(u64);
/// Normalize a raw token by converting ASCII letters to lowercase and
/// stripping non-alphanumeric characters from both ends.
/// Returns a slice into the provided buffer; caller owns the buffer.
fn normalizeWord(allocator: std.mem.Allocator, raw: []const u8) ![]const u8 {
// Allocate a buffer large enough to hold the entire input
var buf = try allocator.alloc(u8, raw.len);
var n: usize = 0;
// Convert uppercase ASCII to lowercase (A-Z → a-z)
for (raw) |c| {
var ch = c;
if (ch >= 'A' and ch <= 'Z') ch = ch + 32;
buf[n] = ch;
n += 1;
}
// Strip leading non-alphanumeric characters
var start: usize = 0;
while (start < n and (buf[start] < '0' or (buf[start] > '9' and buf[start] < 'a') or buf[start] > 'z')) : (start += 1) {}
// Strip trailing non-alphanumeric characters
var end: usize = n;
while (end > start and (buf[end - 1] < '0' or (buf[end - 1] > '9' and buf[end - 1] < 'a') or buf[end - 1] > 'z')) : (end -= 1) {}
// If nothing remains after stripping, return empty slice
if (end <= start) return buf[0..0];
return buf[start..end];
}
/// Tokenize text on whitespace and populate the provided map with
/// normalized word frequencies. Keys are normalized copies allocated
/// from the provided allocator.
fn tokenizeAndCount(allocator: std.mem.Allocator, text: []const u8, map: *Map) !void {
// Split on any whitespace character
var it = std.mem.tokenizeAny(u8, text, " \t\r\n");
while (it.next()) |raw| {
const word = try normalizeWord(allocator, raw);
if (word.len == 0) continue; // skip empty tokens
// Insert or update the word count
const gop = try map.getOrPut(word);
if (!gop.found_existing) {
gop.value_ptr.* = 1;
} else {
gop.value_ptr.* += 1;
}
}
}
/// Arguments passed to each worker thread
const WorkerArgs = struct {
slice: []const u8, // segment of text to process
counts: *Map, // thread-local frequency map
arena: *std.heap.ArenaAllocator, // arena for temporary allocations
};
/// Worker function executed by each thread; tokenizes and counts words
/// in its assigned text segment without shared state.
fn countWorker(args: WorkerArgs) void {
// Each worker writes only to its own map instance; merge happens later
tokenizeAndCount(args.arena.allocator(), args.slice, args.counts) catch |err| {
std.debug.print("worker error: {s}\n", .{@errorName(err)});
};
}
/// Read an entire file into a newly allocated buffer, capped at 64 MiB.
fn readAllAlloc(path: []const u8, allocator: std.mem.Allocator) ![]u8 {
var file = try std.fs.cwd().openFile(path, .{});
defer file.close();
return try file.readToEndAlloc(allocator, 64 * 1024 * 1024);
}
/// Partition text into roughly equal segments, ensuring shard boundaries
/// fall at whitespace to avoid splitting words. Returns owned slice of slices.
fn shard(text: []const u8, shards: usize, allocator: std.mem.Allocator) ![]const []const u8 {
// If only one shard requested or text is empty, return single segment
if (shards <= 1 or text.len == 0) {
var single = try allocator.alloc([]const u8, 1);
single[0] = text;
return single;
}
const approx = text.len / shards; // approximate bytes per shard
var parts = std.array_list.Managed([]const u8).init(allocator);
defer parts.deinit();
var i: usize = 0;
while (i < text.len) {
var end = @min(text.len, i + approx);
// Push shard boundary forward to the next whitespace character
while (end < text.len and text[end] != ' ' and text[end] != '\n' and text[end] != '\t' and text[end] != '\r') : (end += 1) {}
// If no whitespace found, fall back to approximate boundary
if (end == i) end = @min(text.len, i + approx);
try parts.append(text[i..end]);
i = end;
}
return try parts.toOwnedSlice();
}
pub fn main() !void {
var gpa = std.heap.GeneralPurposeAllocator(.{}){};
defer _ = gpa.deinit();
const allocator = gpa.allocator();
// Set up buffered stdout for efficient printing
var stdout_buf: [1024]u8 = undefined;
var stdout_state = std.fs.File.stdout().writer(&stdout_buf);
const out = &stdout_state.interface;
// Parse command-line arguments
var args_it = try std.process.argsWithAllocator(allocator);
defer args_it.deinit();
_ = args_it.next(); // skip program name
const path = args_it.next() orelse {
try out.print("usage: parallel-wc <file>\n", .{});
try out.flush();
return;
};
// Read entire file into memory
const text = try readAllAlloc(path, allocator);
defer allocator.free(text);
// Determine shard count: use CPU count unless single-threaded build
const cpu = std.Thread.getCpuCount() catch 1;
const shard_count = if (builtin.single_threaded) 1 else if (cpu < 1) 1 else cpu;
// Partition text into shards at whitespace boundaries
const parts = try shard(text, shard_count, allocator);
defer allocator.free(parts);
// Allocate per-shard arenas and maps
var arenas = try allocator.alloc(std.heap.ArenaAllocator, parts.len);
defer allocator.free(arenas);
var maps = try allocator.alloc(Map, parts.len);
defer allocator.free(maps);
// Allocate thread handles if multi-threaded
var threads = if (builtin.single_threaded) &[_]std.Thread{} else try allocator.alloc(std.Thread, parts.len);
defer if (!builtin.single_threaded) allocator.free(threads);
// Spawn worker threads (or execute inline if single-threaded)
for (parts, 0..) |seg, i| {
arenas[i] = std.heap.ArenaAllocator.init(allocator);
maps[i] = Map.init(allocator);
try maps[i].ensureTotalCapacity(1024); // pre-size to reduce rehashing
if (builtin.single_threaded) {
// Execute worker inline
countWorker(.{ .slice = seg, .counts = &maps[i], .arena = &arenas[i] });
} else {
// Spawn a thread for this shard
threads[i] = try std.Thread.spawn(.{}, countWorker, .{WorkerArgs{ .slice = seg, .counts = &maps[i], .arena = &arenas[i] }});
}
}
// Wait for all threads to complete
if (!builtin.single_threaded) {
for (threads) |t| t.join();
}
// Merge per-thread maps into a single global map
var total = Map.init(allocator);
defer total.deinit();
try total.ensureTotalCapacity(4096); // pre-size for merged data
for (maps, 0..) |*m, i| {
var it = m.iterator();
while (it.next()) |e| {
const key_bytes = e.key_ptr.*;
// Duplicate key into total's allocator to take ownership,
// since arenas will be freed shortly
const dup = try allocator.dupe(u8, key_bytes);
const gop = try total.getOrPut(dup);
if (!gop.found_existing) {
gop.value_ptr.* = e.value_ptr.*;
} else {
// Key already exists; free the duplicate and accumulate count
allocator.free(dup);
gop.value_ptr.* += e.value_ptr.*;
}
}
// Free per-thread arena and map
arenas[i].deinit();
m.deinit();
}
// Build a sortable list of (word, count) entries
const Entry = struct { k: []const u8, v: u64 };
var entries = std.array_list.Managed(Entry).init(allocator);
defer entries.deinit();
var it = total.iterator();
while (it.next()) |e| {
try entries.append(.{ .k = e.key_ptr.*, .v = e.value_ptr.* });
}
// Sort by count descending, then alphabetically
std.sort.pdq(Entry, entries.items, {}, struct {
fn lessThan(_: void, a: Entry, b: Entry) bool {
if (a.v == b.v) return std.mem.lessThan(u8, a.k, b.k);
return a.v > b.v; // descending by count
}
}.lessThan);
// Print top 10 most frequent words
const to_show = @min(entries.items.len, 10);
try out.print("top {d} words in {d} shards:\n", .{ to_show, parts.len });
for (entries.items[0..to_show]) |e| {
try out.print("{s} {d}\n", .{ e.k, e.v });
}
// Free duplicated keys now that we are done with the map
var free_it = total.iterator();
while (free_it.next()) |e| allocator.free(e.key_ptr.*);
try out.flush();
}
$ zig build --build-file chapters-data/code/30__project-parallel-wordcount/build.zig run -- chapters-data/code/30__project-parallel-wordcount/data/lines.txttop 10 words in 8 shards:
and 2
i 2
little 2
me 2
the 2
a 1
about 1
ago—never 1
call 1
how 1StringHashMap按引用存储字符串切片;它不复制字节。当合并指向短命arena的映射时,将键字节复制到目标分配器,然后在完成后释放它们。示例在退出前迭代并释放键。
注意事项与限制
- 分片推动分片结束到下一个空白处以避免在单词中间分割标记。这意味着分片可能不均匀;对于I/O绑定工具来说,这是可以的。
- 示例小写ASCII并粗略剥离标点符号以专注于线程。如果需要Unicode分割,请集成
std.unicode和更忠实的规范化。 - 在单线程构建(
-Dsingle-threaded=true)中,我们内联执行工作器并完全跳过生成,镜像第29章的模式。
练习
- 添加
-n <N>以打印前N个词,使用std.process.argsWithAllocator解析标志。 - 将合并阶段切换为并行归约:每CPU成对合并直到剩下一张映射;测量可扩展性。
- 将arena替换为通过
file size / shards调整大小的bump分配器,并推理碎片化与峰值占用空间。
总结
本项目提炼出从磁盘上的字节到排序频率表的实用快速路径,同时遵守Zig的所有权和线程模型。它整合了分片、每线程映射和安全合并——一个为更大管道准备的最小模板。