Skip to content

Commit

Permalink
Merge pull request #35 from acoustid/attributes
Browse files Browse the repository at this point in the history
Add support for attributes to assist external replication
  • Loading branch information
lalinsky authored Dec 2, 2024
2 parents e033ba6 + 72b3ec6 commit 5f32ee4
Show file tree
Hide file tree
Showing 9 changed files with 141 additions and 11 deletions.
4 changes: 2 additions & 2 deletions build.zig.zon
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
.hash = "12201776681f1e5ec6df7df30786e90771c5de564c941a309c73c4299c7864ddb4c3",
},
.msgpack = .{
.url = "git+https://github.com/lalinsky/msgpack.zig?ref=main#7c0a9846b33063199e56e50d683b4ca8785c773e",
.hash = "12207a2d5cff5690049e70a0ce65c8a7b67bf385abc3acf86caa42db2a921c83a269",
.url = "git+https://github.com/lalinsky/msgpack.zig?ref=v0.1.0#d141ef4e1f585fecbbcdac9a9f85e41b5759182c",
.hash = "1220cb5fbd418638a830cb3c8a47d95d766d5ec1904631a14cde18cad89047165404",
},
},
.paths = .{
Expand Down
2 changes: 2 additions & 0 deletions src/FileSegment.zig
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ pub const Options = struct {
allocator: std.mem.Allocator,
dir: std.fs.Dir,
info: SegmentInfo = .{},
attributes: std.AutoHashMapUnmanaged(u64, u64) = .{},
docs: std.AutoHashMap(u32, bool),
index: std.ArrayList(u32),
block_size: usize = 0,
Expand All @@ -41,6 +42,7 @@ pub fn init(allocator: std.mem.Allocator, options: Options) Self {
}

pub fn deinit(self: *Self, delete_file: KeepOrDelete) void {
self.attributes.deinit(self.allocator);
self.docs.deinit();
self.index.deinit();

Expand Down
34 changes: 32 additions & 2 deletions src/Index.zig
Original file line number Diff line number Diff line change
Expand Up @@ -446,6 +446,11 @@ fn releaseSegments(self: *Self, segments: *SegmentsSnapshot) void {
FileSegmentList.destroySegments(self.allocator, &segments.file_segments);
}

const segment_lists = [_][]const u8{
"file_segments",
"memory_segments",
};

pub fn search(self: *Self, hashes: []const u32, allocator: std.mem.Allocator, deadline: Deadline) !SearchResults {
const sorted_hashes = try allocator.dupe(u32, hashes);
defer allocator.free(sorted_hashes);
Expand All @@ -457,14 +462,39 @@ pub fn search(self: *Self, hashes: []const u32, allocator: std.mem.Allocator, de
var snapshot = self.acquireSegments();
defer self.releaseSegments(&snapshot); // FIXME this possibly deletes orphaned segments, do it in a separate thread

try snapshot.file_segments.value.search(sorted_hashes, &results, deadline);
try snapshot.memory_segments.value.search(sorted_hashes, &results, deadline);
inline for (segment_lists) |n| {
const segments = @field(snapshot, n);
try segments.value.search(sorted_hashes, &results, deadline);
}

results.sort();

return results;
}

pub fn getAttributes(self: *Self, allocator: std.mem.Allocator) !std.AutoHashMapUnmanaged(u64, u64) {
var result: std.AutoHashMapUnmanaged(u64, u64) = .{};
errdefer result.deinit(allocator);

var snapshot = self.acquireSegments();
defer self.releaseSegments(&snapshot); // FIXME this possibly deletes orphaned segments, do it in a separate thread

var last_version: u64 = 0;
inline for (segment_lists) |n| {
const segments = @field(snapshot, n);
for (segments.value.nodes.items) |node| {
var iter = node.value.attributes.iterator();
while (iter.next()) |entry| {
try result.put(allocator, entry.key_ptr.*, entry.value_ptr.*);
}
std.debug.assert(node.value.info.version > last_version);
last_version = node.value.info.version;
}
}

return result;
}

test {
_ = @import("index_tests.zig");
}
19 changes: 19 additions & 0 deletions src/MemorySegment.zig
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ pub const Options = struct {};

allocator: std.mem.Allocator,
info: SegmentInfo = .{},
attributes: std.AutoHashMapUnmanaged(u64, u64) = .{},
docs: std.AutoHashMap(u32, bool),
items: std.ArrayList(Item),
frozen: bool = false,
Expand All @@ -34,6 +35,8 @@ pub fn init(allocator: std.mem.Allocator, opts: Options) Self {

pub fn deinit(self: *Self, delete_file: KeepOrDelete) void {
_ = delete_file;

self.attributes.deinit(self.allocator);
self.docs.deinit();
self.items.deinit();
}
Expand All @@ -54,6 +57,7 @@ pub fn getSize(self: Self) usize {
}

pub fn build(self: *Self, changes: []const Change) !void {
var num_attributes: u32 = 0;
var num_docs: u32 = 0;
var num_items: usize = 0;
for (changes) |change| {
Expand All @@ -65,9 +69,13 @@ pub fn build(self: *Self, changes: []const Change) !void {
.delete => {
num_docs += 1;
},
.set_attribute => {
num_attributes += 1;
},
}
}

try self.attributes.ensureTotalCapacity(self.allocator, num_attributes);
try self.docs.ensureTotalCapacity(num_docs);
try self.items.ensureTotalCapacity(num_items);

Expand All @@ -92,6 +100,12 @@ pub fn build(self: *Self, changes: []const Change) !void {
result.value_ptr.* = false;
}
},
.set_attribute => |op| {
const result = self.attributes.getOrPutAssumeCapacity(op.key);
if (!result.found_existing) {
result.value_ptr.* = op.value;
}
},
}
}

Expand All @@ -103,8 +117,13 @@ pub fn cleanup(self: *Self) void {
}

pub fn merge(self: *Self, merger: *SegmentMerger(Self)) !void {
std.debug.assert(self.allocator.ptr == merger.allocator.ptr);

self.info = merger.segment.info;

self.attributes.deinit(self.allocator);
self.attributes = merger.segment.attributes.move();

self.docs.deinit();
self.docs = merger.segment.docs.move();

Expand Down
10 changes: 10 additions & 0 deletions src/change.zig
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,19 @@ pub const Delete = struct {
}
};

pub const SetAttribute = struct {
key: u64,
value: u64,

pub fn msgpackFormat() msgpack.StructFormat {
return .{ .as_map = .{ .key = .{ .field_name_prefix = 1 } } };
}
};

pub const Change = union(enum) {
insert: Insert,
delete: Delete,
set_attribute: SetAttribute,

pub fn msgpackFormat() msgpack.UnionFormat {
return .{ .as_map = .{ .key = .{ .field_name_prefix = 1 } } };
Expand Down
48 changes: 42 additions & 6 deletions src/filefmt.zig
Original file line number Diff line number Diff line change
Expand Up @@ -238,36 +238,57 @@ const segment_file_header_magic_v1: u32 = 0x53474D31; // "SGM1" in big endian
const segment_file_footer_magic_v1: u32 = @byteSwap(segment_file_header_magic_v1);

pub const SegmentFileHeader = struct {
magic: u32 = segment_file_header_magic_v1,
block_size: u32,
magic: u32,
info: SegmentInfo,
has_attributes: bool,
has_docs: bool,
block_size: u32,

pub fn msgpackFormat() msgpack.StructFormat {
return .{
.as_map = .{
.key = .field_index,
.key = .field_index, // FIXME
.omit_defaults = false,
.omit_nulls = true,
},
};
}

pub fn msgpackFieldKey(field: std.meta.FieldEnum(@This())) u8 {
return switch (field) {
.magic => 0x00,
.info => 0x01,
.has_attributes => 0x02,
.has_docs => 0x03,
.block_size => 0x04,
};
}
};

pub const SegmentFileFooter = struct {
magic: u32 = segment_file_footer_magic_v1,
magic: u32,
num_items: u32,
num_blocks: u32,
checksum: u64,

pub fn msgpackFormat() msgpack.StructFormat {
return .{
.as_map = .{
.key = .field_index,
.key = .field_index, // FIXME
.omit_defaults = false,
.omit_nulls = true,
},
};
}

pub fn msgpackFieldKey(field: std.meta.FieldEnum(@This())) u8 {
return switch (field) {
.magic => 0x00,
.num_items => 0x01,
.num_blocks => 0x02,
.checksum => 0x03,
};
}
};

pub fn deleteSegmentFile(dir: std.fs.Dir, info: SegmentInfo) !void {
Expand Down Expand Up @@ -299,11 +320,15 @@ pub fn writeSegmentFile(dir: std.fs.Dir, reader: anytype) !void {
const packer = msgpack.packer(writer);

const header = SegmentFileHeader{
.magic = segment_file_header_magic_v1,
.block_size = block_size,
.info = segment.info,
.has_attributes = true,
.has_docs = true,
};
try packer.write(SegmentFileHeader, header);

try packer.writeMap(segment.attributes);
try packer.writeMap(segment.docs);

try buffered_writer.flush();
Expand Down Expand Up @@ -393,7 +418,18 @@ pub fn readSegmentFile(dir: fs.Dir, info: SegmentInfo, segment: *FileSegment) !v
segment.info = header.info;
segment.block_size = header.block_size;

try unpacker.readMapInto(&segment.docs);
if (header.has_attributes) {
// FIXME nicer api in msgpack.zig
var attributes = std.AutoHashMap(u64, u64).init(segment.allocator);
defer attributes.deinit();
try unpacker.readMapInto(&attributes);
segment.attributes.deinit(segment.allocator);
segment.attributes = attributes.unmanaged.move();
}

if (header.has_docs) {
try unpacker.readMapInto(&segment.docs);
}

const block_size = header.block_size;
const padding_size = block_size - fixed_buffer_stream.pos % block_size;
Expand Down
14 changes: 14 additions & 0 deletions src/segment_merger.zig
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ const SharedPtr = @import("utils/shared_ptr.zig").SharedPtr;

pub const MergedSegmentInfo = struct {
info: SegmentInfo = .{},
attributes: std.AutoHashMapUnmanaged(u64, u64) = .{},
docs: std.AutoHashMap(u32, bool),
};

Expand Down Expand Up @@ -76,16 +77,29 @@ pub fn SegmentMerger(comptime Segment: type) type {
return error.NoSources;
}

var total_attributes: u32 = 0;
var total_docs: u32 = 0;
for (sources, 0..) |source, i| {
if (i == 0) {
self.segment.info = source.reader.segment.info;
} else {
self.segment.info = SegmentInfo.merge(self.segment.info, source.reader.segment.info);
}
total_attributes += source.reader.segment.attributes.count();
total_docs += source.reader.segment.docs.count();
}

try self.segment.attributes.ensureTotalCapacity(self.allocator, total_attributes);
for (sources) |*source| {
const segment = source.reader.segment;
var iter = segment.attributes.iterator();
while (iter.next()) |entry| {
const key = entry.key_ptr.*;
const value = entry.value_ptr.*;
self.segment.attributes.putAssumeCapacity(key, value);
}
}

try self.segment.docs.ensureTotalCapacity(total_docs);
for (sources) |*source| {
const segment = source.reader.segment;
Expand Down
19 changes: 19 additions & 0 deletions src/server.zig
Original file line number Diff line number Diff line change
Expand Up @@ -270,16 +270,35 @@ fn handleHeadIndex(ctx: *Context, req: *httpz.Request, res: *httpz.Response) !vo
return;
}

const Attributes = struct {
attributes: std.AutoHashMapUnmanaged(u64, u64),

pub fn jsonStringify(self: Attributes, jws: anytype) !void {
try jws.beginArray();
var iter = self.attributes.iterator();
while (iter.next()) |entry| {
try jws.beginArray();
try jws.write(entry.key_ptr.*);
try jws.write(entry.value_ptr.*);
try jws.endArray();
}
try jws.endArray();
}
};

const GetIndexResponse = struct {
status: []const u8,
attributes: Attributes,
};

fn handleGetIndex(ctx: *Context, req: *httpz.Request, res: *httpz.Response) !void {
const index_ref = try getIndex(ctx, req, res, true) orelse return;
defer releaseIndex(ctx, index_ref);

const attributes = try index_ref.index.getAttributes(req.arena);
const response = GetIndexResponse{
.status = "ok",
.attributes = .{ .attributes = attributes },
};
return res.json(&response, .{});
}
Expand Down
2 changes: 1 addition & 1 deletion src/utils/shared_ptr.zig
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ pub fn RefCounter(comptime T: type) type {

pub fn init() Self {
return .{
.refs = std.atomic.Value(u32).init(1),
.refs = std.atomic.Value(T).init(1),
};
}

Expand Down

0 comments on commit 5f32ee4

Please sign in to comment.