Page MenuHomePhorge

fetcher.zig
No OneTemporary

Size
55 KB
Referenced Files
None
Subscribers
None

fetcher.zig

const std = @import("std");
const c_allocator = std.heap.c_allocator;
const debug = @import("debug.zig");
const extractor = @import("extractor.zig");
const uv = @import("uv.zig");
const tlsuv = @import("tlsuv.zig");
const nghttp2 = @import("nghttp2.zig");
const config = @import("config");
const user_agent: [:0]const u8 = "ant/" ++ config.version;
pub const FetchError = error{
ConnectionFailed,
TlsError,
Http2Error,
Timeout,
InvalidUrl,
ResponseError,
OutOfMemory,
};
pub const ParsedUrl = struct {
scheme: []const u8,
host: []const u8,
port: u16,
path: []const u8,
pub fn parse(url: []const u8) !ParsedUrl {
var remaining = url;
const scheme_end = std.mem.indexOf(u8, remaining, "://") orelse return error.InvalidUrl;
const scheme = remaining[0..scheme_end];
remaining = remaining[scheme_end + 3 ..];
const path_start = std.mem.indexOf(u8, remaining, "/") orelse remaining.len;
const host_port = remaining[0..path_start];
remaining = if (path_start < remaining.len) remaining[path_start..] else "/";
var host: []const u8 = host_port;
var port: u16 = if (std.mem.eql(u8, scheme, "https")) 443 else 80;
if (std.mem.indexOf(u8, host_port, ":")) |colon| {
host = host_port[0..colon];
port = std.fmt.parseInt(u16, host_port[colon + 1 ..], 10) catch return error.InvalidUrl;
}
return .{ .scheme = scheme, .host = host, .port = port, .path = remaining };
}
};
pub const StreamHandler = struct {
on_data: *const fn ([]const u8, ?*anyopaque) void,
on_complete: *const fn (u16, ?*anyopaque) void,
on_error: *const fn (FetchError, ?*anyopaque) void,
user_data: ?*anyopaque,
pub fn init(
on_data: *const fn ([]const u8, ?*anyopaque) void,
on_complete: *const fn (u16, ?*anyopaque) void,
on_error: *const fn (FetchError, ?*anyopaque) void,
user_data: ?*anyopaque,
) StreamHandler {
return .{ .on_data = on_data, .on_complete = on_complete, .on_error = on_error, .user_data = user_data };
}
};
const PendingRequest = struct {
url: []const u8,
handler: ?StreamHandler,
};
const MAX_PENDING_REQUESTS = 20;
const NUM_CONNECTIONS = 6;
const NUM_META_CONNECTIONS = 3;
const META_SLOW_LOG_MS: u64 = 250;
const Http2Client = struct {
allocator: std.mem.Allocator,
loop: *uv.loop_t,
tls: tlsuv.stream_t,
h2_session: ?*nghttp2.session,
host: [:0]const u8,
use_tls: bool,
connected: i32,
connect_pending: bool,
closing: bool,
write_buf: std.ArrayListUnmanaged(u8),
requests: [MAX_PENDING_REQUESTS]RequestState,
request_count: usize,
requests_done: usize,
last_response_status_code: u16,
const RequestState = struct {
stream_id: i32,
path: ?[:0]const u8,
on_data: ?*const fn ([]const u8, ?*anyopaque) void,
on_complete: ?*const fn (u16, ?*anyopaque) void,
on_error: ?*const fn (FetchError, ?*anyopaque) void,
userdata: ?*anyopaque,
response_body: std.ArrayListUnmanaged(u8),
status_code: u16,
done: bool,
has_error: bool,
start_ns: u64,
end_ns: u64,
bytes: usize,
content_encoding: ContentEncoding,
};
const ContentEncoding = enum {
identity,
gzip,
};
const alpn_protocols = [_][*:0]const u8{ "h2", "http/1.1" };
pub fn init(allocator: std.mem.Allocator, host: []const u8, use_tls: bool) !*Http2Client {
const client = try allocator.create(Http2Client);
errdefer allocator.destroy(client);
const host_z = try allocator.dupeZ(u8, host);
errdefer allocator.free(host_z);
client.* = .{
.allocator = allocator,
.loop = uv.uv_default_loop(),
.tls = .{},
.h2_session = null,
.host = host_z,
.use_tls = use_tls,
.connected = 0,
.connect_pending = false,
.closing = false,
.write_buf = .{},
.requests = undefined,
.request_count = 0,
.requests_done = 0,
.last_response_status_code = 0,
};
for (&client.requests) |*req| {
req.* = .{
.stream_id = 0,
.path = null,
.on_data = null,
.on_complete = null,
.on_error = null,
.userdata = null,
.response_body = .{},
.status_code = 0,
.done = false,
.has_error = false,
.start_ns = 0,
.end_ns = 0,
.bytes = 0,
.content_encoding = .identity,
};
}
if (tlsuv.tlsuv_stream_init(client.loop, &client.tls, null) != 0) {
allocator.free(host_z);
allocator.destroy(client);
return error.ConnectionFailed;
}
_ = tlsuv.tlsuv_stream_set_hostname(&client.tls, host_z.ptr);
_ = tlsuv.tlsuv_stream_set_protocols(&client.tls, 2, &alpn_protocols);
return client;
}
pub fn deinit(self: *Http2Client) void {
self.closing = true;
self.connect_pending = false;
for (&self.requests) |*req| {
req.on_data = null;
req.on_complete = null;
req.on_error = null;
req.userdata = null;
}
if (self.connected > 0) {
self.tls.data = self;
_ = tlsuv.tlsuv_stream_close(&self.tls, onStreamClose);
while (self.connected > 0) _ = uv.uv_run(self.loop, uv.RUN_ONCE);
}
if (self.h2_session) |session| nghttp2.nghttp2_session_del(session);
for (&self.requests) |*req| {
if (req.stream_id != -1) {
if (req.path) |p| self.allocator.free(p);
req.response_body.deinit(self.allocator);
}
}
self.write_buf.deinit(self.allocator);
self.allocator.free(self.host);
self.allocator.destroy(self);
}
pub fn resetRequests(self: *Http2Client) void {
for (self.requests[0..self.request_count]) |*req| {
if (req.stream_id != -1) {
if (req.path) |p| self.allocator.free(p);
req.response_body.deinit(self.allocator);
}
req.* = .{
.stream_id = 0,
.path = null,
.on_data = null,
.on_complete = null,
.on_error = null,
.userdata = null,
.response_body = .{},
.status_code = 0,
.done = false,
.has_error = false,
.start_ns = 0,
.end_ns = 0,
.bytes = 0,
.content_encoding = .identity,
};
}
self.request_count = 0;
self.requests_done = 0;
}
pub fn hasCapacity(self: *const Http2Client) bool {
for (self.requests[0..self.request_count]) |req| {
if (req.stream_id == -1) return true;
}
return self.request_count < MAX_PENDING_REQUESTS - 1;
}
pub fn recycleCompletedRequests(self: *Http2Client) void {
if (self.requests_done == 0) return;
for (self.requests[0..self.request_count]) |*req| {
if (req.done and req.stream_id != -1) {
if (req.path) |p| self.allocator.free(p);
req.response_body.deinit(self.allocator);
req.path = null;
req.response_body = .{};
req.stream_id = -1;
}
}
}
fn findOrAllocSlot(self: *Http2Client) ?*RequestState {
for (self.requests[0..self.request_count]) |*req| {
if (req.stream_id == -1) return req;
}
if (self.request_count < MAX_PENDING_REQUESTS) {
const req = &self.requests[self.request_count];
self.request_count += 1;
return req;
}
return null;
}
fn onStreamClose(handle: *uv.handle_t) callconv(.c) void {
const tls: *tlsuv.stream_t = @ptrCast(@alignCast(handle));
const client: *Http2Client = @ptrCast(@alignCast(tls.data));
client.connected = -2;
client.connect_pending = false;
}
fn findRequest(self: *Http2Client, stream_id: i32) ?*RequestState {
for (self.requests[0..self.request_count]) |*req| if (req.stream_id == stream_id) return req;
return null;
}
fn h2Send(_: ?*nghttp2.session, data: [*c]const u8, len: usize, _: c_int, ud: ?*anyopaque) callconv(.c) isize {
const client: *Http2Client = @ptrCast(@alignCast(ud));
client.write_buf.appendSlice(client.allocator, data[0..len]) catch return nghttp2.ERR_NOMEM;
return @intCast(len);
}
fn h2FrameRecv(_: ?*nghttp2.session, frame: *const nghttp2.frame, ud: ?*anyopaque) callconv(.c) c_int {
const client: *Http2Client = @ptrCast(@alignCast(ud));
if (frame.hd.flags & nghttp2.FLAG_END_STREAM != 0) {
if (client.findRequest(frame.hd.stream_id)) |req| {
if (!req.done) {
req.done = true;
req.end_ns = @intCast(std.time.nanoTimestamp());
client.requests_done += 1;
if (req.on_complete) |cb| cb(req.status_code, req.userdata);
}
}
}
return 0;
}
fn h2DataChunk(session: ?*nghttp2.session, _: u8, stream_id: i32, data: [*c]const u8, len: usize, ud: ?*anyopaque) callconv(.c) c_int {
const client: *Http2Client = @ptrCast(@alignCast(ud));
const req = client.findRequest(stream_id) orelse return 0;
if (req.on_data) |cb| cb(data[0..len], req.userdata) else req.response_body.appendSlice(client.allocator, data[0..len]) catch {
req.has_error = true;
}; req.bytes += len;
if (session) |s| _ = nghttp2.nghttp2_session_consume(s, stream_id, len);
return 0;
}
fn h2Header(_: ?*nghttp2.session, frame: *const nghttp2.frame, name: [*c]const u8, namelen: usize, value: [*c]const u8, valuelen: usize, _: u8, ud: ?*anyopaque) callconv(.c) c_int {
const client: *Http2Client = @ptrCast(@alignCast(ud));
if (frame.hd.type != nghttp2.HEADERS) return 0;
const req = client.findRequest(frame.hd.stream_id) orelse return 0;
if (namelen == 7 and std.mem.eql(u8, name[0..7], ":status"))
req.status_code = std.fmt.parseInt(u16, value[0..valuelen], 10) catch 0;
if (std.mem.eql(u8, name[0..namelen], "content-encoding")) {
if (std.mem.startsWith(u8, value[0..valuelen], "gzip")) {
req.content_encoding = .gzip;
}
}
return 0;
}
fn h2StreamClose(_: ?*nghttp2.session, stream_id: i32, error_code: u32, ud: ?*anyopaque) callconv(.c) c_int {
const client: *Http2Client = @ptrCast(@alignCast(ud));
const req = client.findRequest(stream_id) orelse return 0;
if (!req.done) {
req.done = true;
req.end_ns = @intCast(std.time.nanoTimestamp());
client.requests_done += 1;
if (error_code != 0) {
req.has_error = true;
if (req.on_error) |cb| cb(FetchError.Http2Error, req.userdata);
} else if (req.on_complete) |cb| cb(req.status_code, req.userdata);
}
return 0;
}
fn initH2(self: *Http2Client) !void {
var callbacks: *nghttp2.session_callbacks = undefined;
if (nghttp2.nghttp2_session_callbacks_new(&callbacks) != 0) return error.Http2Error;
defer nghttp2.nghttp2_session_callbacks_del(callbacks);
nghttp2.nghttp2_session_callbacks_set_send_callback2(callbacks, h2Send);
nghttp2.nghttp2_session_callbacks_set_on_frame_recv_callback(callbacks, h2FrameRecv);
nghttp2.nghttp2_session_callbacks_set_on_data_chunk_recv_callback(callbacks, h2DataChunk);
nghttp2.nghttp2_session_callbacks_set_on_header_callback(callbacks, h2Header);
nghttp2.nghttp2_session_callbacks_set_on_stream_close_callback(callbacks, h2StreamClose);
var session: *nghttp2.session = undefined;
if (nghttp2.nghttp2_session_client_new(&session, callbacks, self) != 0) return error.Http2Error;
self.h2_session = session;
var settings = [_]nghttp2.settings_entry{
.{ .settings_id = nghttp2.SETTINGS_MAX_CONCURRENT_STREAMS, .value = MAX_PENDING_REQUESTS },
.{ .settings_id = nghttp2.SETTINGS_INITIAL_WINDOW_SIZE, .value = 16 * 1024 * 1024 },
};
if (nghttp2.nghttp2_submit_settings(self.h2_session.?, nghttp2.FLAG_NONE, &settings, settings.len) != 0) return error.Http2Error;
const conn_window_increase: i32 = (16 * 1024 * 1024) - 65535;
_ = nghttp2.nghttp2_submit_window_update(self.h2_session.?, nghttp2.FLAG_NONE, 0, conn_window_increase);
}
fn flush(self: *Http2Client) !void {
if (self.closing) return error.ConnectionFailed;
if (self.h2_session) |session| while (nghttp2.nghttp2_session_want_write(session) != 0) if (nghttp2.nghttp2_session_send(session) != 0) break;
if (self.write_buf.items.len > 0) {
const data = try self.allocator.dupe(u8, self.write_buf.items);
self.write_buf.clearRetainingCapacity();
const wr = try self.allocator.create(uv.write_t);
wr.data = data.ptr;
var buf = uv.buf_t{ .base = data.ptr, .len = data.len };
if (tlsuv.tlsuv_stream_write(wr, &self.tls, &buf, onWrite) != 0) {
self.allocator.free(data);
self.allocator.destroy(wr);
return error.ConnectionFailed;
}
}
}
fn onWrite(wr: *uv.write_t, _: c_int) callconv(.c) void {
const data_ptr: [*]u8 = @ptrCast(wr.data);
std.c.free(data_ptr);
std.c.free(@ptrCast(wr));
}
fn allocBuf(_: *uv.handle_t, size: usize, buf: *uv.buf_t) callconv(.c) void {
buf.base = @ptrCast(std.c.malloc(size) orelse return);
buf.len = size;
}
fn onRead(stream: *uv.stream_t, nread: isize, buf: *const uv.buf_t) callconv(.c) void {
const tls: *tlsuv.stream_t = @ptrCast(@alignCast(stream));
const client: *Http2Client = @ptrCast(@alignCast(tls.data));
defer if (buf.base) |b| std.c.free(b);
if (client.closing) return;
if (nread < 0) {
for (client.requests[0..client.request_count]) |*req| if (!req.done) {
req.done = true;
req.has_error = true;
client.requests_done += 1;
if (req.on_error) |cb| cb(FetchError.ConnectionFailed, req.userdata);
};
return;
}
if (nread > 0 and client.h2_session != null) {
_ = nghttp2.nghttp2_session_mem_recv(client.h2_session.?, @ptrCast(buf.base), @intCast(nread));
client.flush() catch {};
}
}
fn onConnect(req: *uv.connect_t, status: c_int) callconv(.c) void {
const ctx: *ConnectCtx = @ptrCast(@alignCast(req.data));
defer ctx.client.allocator.destroy(ctx);
ctx.client.connect_pending = false;
if (ctx.client.closing) {
ctx.client.connected = -1;
_ = tlsuv.tlsuv_stream_close(&ctx.client.tls, onStreamClose);
return;
}
if (status < 0) {
ctx.client.connected = -1;
return;
}
ctx.client.connected = 1;
ctx.client.tls.data = ctx.client;
ctx.client.initH2() catch {
ctx.client.connected = -1;
return;
};
_ = tlsuv.tlsuv_stream_read_start(&ctx.client.tls, allocBuf, onRead);
ctx.client.flush() catch {};
}
const ConnectCtx = struct { client: *Http2Client, req: uv.connect_t };
fn ensureConnected(self: *Http2Client) !void {
if (self.closing) return error.ConnectionFailed;
if (self.connected > 0) return;
if (self.connected < 0) return error.ConnectionFailed;
var conn_start: u64 = @intCast(std.time.nanoTimestamp());
if (!self.connect_pending) {
const ctx = try self.allocator.create(ConnectCtx);
ctx.* = .{ .client = self, .req = .{} };
ctx.req.data = ctx;
if (tlsuv.tlsuv_stream_connect(&ctx.req, &self.tls, self.host.ptr, if (self.use_tls) 443 else 80, onConnect) != 0) {
self.allocator.destroy(ctx);
return error.ConnectionFailed;
}
self.connect_pending = true;
}
var loop_count: u32 = 0;
while (self.connected == 0) {
_ = uv.uv_run(self.loop, uv.RUN_ONCE);
loop_count += 1;
}
conn_start = debug.timer(" h2: tls connect", conn_start);
debug.log(" h2: connect loop iterations={d}", .{loop_count});
if (self.connected < 0) return error.ConnectionFailed;
}
pub fn initiateConnectAsync(self: *Http2Client) !void {
if (self.closing) return error.ConnectionFailed;
if (self.connected > 0) return;
if (self.connected < 0) return error.ConnectionFailed;
if (self.connect_pending) return;
const ctx = try self.allocator.create(ConnectCtx);
ctx.* = .{ .client = self, .req = .{} };
ctx.req.data = ctx;
if (tlsuv.tlsuv_stream_connect(&ctx.req, &self.tls, self.host.ptr, if (self.use_tls) 443 else 80, onConnect) != 0) {
self.allocator.destroy(ctx);
return error.ConnectionFailed;
}
self.connect_pending = true;
}
fn makeNv(name: [:0]const u8, value: [:0]const u8) nghttp2.nv {
return .{
.name = @constCast(name.ptr),
.value = @constCast(value.ptr),
.namelen = name.len, .valuelen = value.len, .flags = nghttp2.NV_FLAG_NONE
};
}
pub fn get(self: *Http2Client, path: []const u8, allocator: std.mem.Allocator) ![]u8 {
return self.getWithAccept(path, "application/json", allocator);
}
pub fn getWithAccept(self: *Http2Client, path: []const u8, accept: [:0]const u8, allocator: std.mem.Allocator) ![]u8 {
try self.ensureConnected();
if (self.request_count >= MAX_PENDING_REQUESTS) self.resetRequests();
const req = &self.requests[self.request_count]; self.request_count += 1;
req.* = .{
.stream_id = 0,
.path = try self.allocator.dupeZ(u8, path),
.on_data = null,
.on_complete = null,
.on_error = null,
.userdata = null,
.response_body = .{},
.status_code = 0,
.done = false,
.has_error = false,
.start_ns = @intCast(std.time.nanoTimestamp()),
.end_ns = 0,
.bytes = 0,
.content_encoding = .identity,
};
const session = self.h2_session orelse return error.Http2Error;
var hdrs = [_]nghttp2.nv{
makeNv(":method", "GET"),
makeNv(":path", req.path.?),
makeNv(":scheme", "https"),
makeNv(":authority", self.host),
makeNv("accept", accept),
makeNv("user-agent", user_agent)
};
const sid = nghttp2.nghttp2_submit_request(session, null, &hdrs, hdrs.len, null, req);
if (sid < 0) {
self.request_count -= 1;
if (req.path) |p| self.allocator.free(p);
return error.Http2Error;
}
req.stream_id = sid;
try self.flush();
while (!req.done) {
_ = uv.uv_run(self.loop, uv.RUN_ONCE);
try self.flush();
}
self.last_response_status_code = req.status_code;
if (req.has_error or req.status_code != 200) return error.ResponseError;
return try allocator.dupe(u8, req.response_body.items);
}
pub fn getStream(self: *Http2Client, path: []const u8, on_data: *const fn ([]const u8, ?*anyopaque) void, on_complete: *const fn (u16, ?*anyopaque) void, on_error: *const fn (FetchError, ?*anyopaque) void, userdata: ?*anyopaque) !void {
try self.ensureConnected();
const req = self.findOrAllocSlot() orelse return error.OutOfMemory;
req.* = .{
.stream_id = 0,
.path = try self.allocator.dupeZ(u8, path),
.on_data = on_data,
.on_complete = on_complete,
.on_error = on_error,
.userdata = userdata,
.response_body = .{},
.status_code = 0,
.done = false,
.has_error = false,
.start_ns = @intCast(std.time.nanoTimestamp()),
.end_ns = 0,
.bytes = 0,
.content_encoding = .identity
};
const session = self.h2_session orelse return error.Http2Error;
var hdrs = [_]nghttp2.nv{
makeNv(":method", "GET"),
makeNv(":path", req.path.?),
makeNv(":scheme", "https"),
makeNv(":authority", self.host),
makeNv("accept", "*/*"),
makeNv("user-agent", user_agent)
};
const sid = nghttp2.nghttp2_submit_request(session, null, &hdrs, hdrs.len, null, req);
if (sid < 0) {
if (req.path) |p| self.allocator.free(p);
req.stream_id = -1;
return error.Http2Error;
}
req.stream_id = sid;
try self.flush();
}
pub fn run(self: *Http2Client) !void {
const run_start: u64 = @intCast(std.time.nanoTimestamp());
var loop_count: u32 = 0;
var last_done: usize = 0;
var last_report: u64 = run_start;
while (self.requests_done < self.request_count) {
if (uv.uv_run(self.loop, uv.RUN_ONCE) == 0) break;
try self.flush();
loop_count += 1;
const now: u64 = @intCast(std.time.nanoTimestamp());
if (now - last_report > 1_000_000_000) {
const done_delta = self.requests_done - last_done;
debug.log(" h2: progress {d}/{d} (+{d} in last 1s) loops={d}", .{
self.requests_done, self.request_count,
done_delta, loop_count,
});
last_done = self.requests_done;
last_report = now;
}
}
const elapsed_ns: u64 = @intCast(@as(i128, @intCast(std.time.nanoTimestamp())) - @as(i128, run_start));
const elapsed_ms = elapsed_ns / 1_000_000;
debug.log(" h2: run complete in {d}ms, {d} loops, {d}/{d} done", .{
elapsed_ms, loop_count,
self.requests_done, self.request_count,
});
var error_count: usize = 0;
for (self.requests[0..self.request_count]) |req| {
if (req.has_error) error_count += 1;
}
if (error_count > 0) {
debug.log(" h2: {d} requests had errors", .{error_count});
return error.ResponseError;
}
}
};
pub const TarballCtx = struct {
handler: StreamHandler,
done: bool,
has_error: bool,
url: []const u8,
start_ns: u64,
bytes: usize,
};
const TarballStats = struct {
url: []const u8,
bytes: usize,
elapsed_ms: u64,
};
const TarballCallbacks = struct {
fn onData(data: []const u8, ud: ?*anyopaque) void {
const ctx: *TarballCtx = @ptrCast(@alignCast(ud));
ctx.bytes += data.len;
ctx.handler.on_data(data, ctx.handler.user_data);
}
fn onComplete(status: u16, ud: ?*anyopaque) void {
const ctx: *TarballCtx = @ptrCast(@alignCast(ud));
ctx.handler.on_complete(status, ctx.handler.user_data);
ctx.done = true;
}
fn onError(err: FetchError, ud: ?*anyopaque) void {
const ctx: *TarballCtx = @ptrCast(@alignCast(ud));
ctx.handler.on_error(err, ctx.handler.user_data);
ctx.done = true;
ctx.has_error = true;
}
};
pub const Fetcher = struct {
allocator: std.mem.Allocator,
registry_host: []const u8,
meta_clients: [NUM_META_CONNECTIONS]?*Http2Client,
meta_clients_initialized: bool,
pending: std.ArrayListUnmanaged(PendingRequest),
tarball_clients: [NUM_CONNECTIONS]?*Http2Client,
tarball_clients_initialized: bool,
tarball_contexts: std.ArrayListUnmanaged(*TarballCtx),
tarball_round_robin: usize,
tarball_stats: std.ArrayListUnmanaged(TarballStats),
last_http_error_url: ?[]u8,
last_http_error_status: u16,
pub const HttpErrorInfo = struct {
url: []const u8,
status: u16,
};
pub fn init(allocator: std.mem.Allocator, registry_host: []const u8) !*Fetcher {
const f = try allocator.create(Fetcher);
f.* = .{
.allocator = allocator,
.registry_host = try allocator.dupe(u8, registry_host),
.meta_clients = [_]?*Http2Client{null} ** NUM_META_CONNECTIONS,
.meta_clients_initialized = false,
.pending = .{},
.tarball_clients = [_]?*Http2Client{null} ** NUM_CONNECTIONS,
.tarball_clients_initialized = false,
.tarball_contexts = .{},
.tarball_round_robin = 0,
.tarball_stats = .{},
.last_http_error_url = null,
.last_http_error_status = 0,
};
return f;
}
pub fn deinit(self: *Fetcher) void {
if (self.last_http_error_url) |url| self.allocator.free(url);
for (&self.meta_clients) |*maybe_client| {
if (maybe_client.*) |c| { c.deinit(); maybe_client.* = null; }
}
for (self.pending.items) |req| self.allocator.free(req.url);
self.pending.deinit(self.allocator);
for (&self.tarball_clients) |*maybe_client| {
if (maybe_client.*) |c| { c.deinit(); maybe_client.* = null; }
}
for (self.tarball_contexts.items) |ctx| {
self.allocator.free(ctx.url);
self.allocator.destroy(ctx);
}
self.tarball_contexts.deinit(self.allocator);
for (self.tarball_stats.items) |stat| self.allocator.free(stat.url);
self.tarball_stats.deinit(self.allocator);
self.allocator.free(self.registry_host);
self.allocator.destroy(self);
}
fn clearLastHttpError(self: *Fetcher) void {
if (self.last_http_error_url) |url| self.allocator.free(url);
self.last_http_error_url = null;
self.last_http_error_status = 0;
}
fn setLastHttpError(self: *Fetcher, url: []const u8, status: u16) void {
self.clearLastHttpError();
self.last_http_error_url = self.allocator.dupe(u8, url) catch null;
self.last_http_error_status = status;
}
pub fn getLastHttpError(self: *const Fetcher) ?HttpErrorInfo {
const url = self.last_http_error_url orelse return null;
return .{ .url = url, .status = self.last_http_error_status };
}
fn ensureMetaClients(self: *Fetcher) !void {
if (self.meta_clients_initialized) return;
for (&self.meta_clients, 0..) |*slot, i| {
const client = Http2Client.init(self.allocator, self.registry_host, true) catch |err| {
debug.log("fetcher: failed to init meta connection {d}: {}", .{ i, err });
continue;
};
client.ensureConnected() catch |err| {
debug.log("fetcher: failed to connect meta {d}: {}", .{ i, err });
client.deinit();
continue;
};
slot.* = client;
}
var any_connected = false;
for (self.meta_clients) |slot| {
if (slot != null) { any_connected = true; break; }
}
if (!any_connected) return error.ConnectionFailed;
self.meta_clients_initialized = true;
}
pub fn resetMetaClients(self: *Fetcher) void {
self.clearLastHttpError();
for (&self.meta_clients) |*slot| {
if (slot.*) |client| { client.deinit(); slot.* = null; }
}
self.meta_clients_initialized = false;
}
fn ensureTarballClients(self: *Fetcher) !void {
if (self.tarball_clients_initialized) return;
debug.log("fetcher: initializing {d} persistent connections", .{NUM_CONNECTIONS});
const init_start: u64 = @intCast(std.time.nanoTimestamp());
for (&self.tarball_clients, 0..) |*slot, i| {
const client = Http2Client.init(self.allocator, self.registry_host, true) catch |err| {
debug.log("fetcher: failed to init connection {d}: {}", .{ i, err });
continue;
};
client.ensureConnected() catch |err| {
debug.log("fetcher: failed to connect {d}: {}", .{ i, err });
client.deinit();
continue;
};
slot.* = client;
}
var any_connected = false;
for (self.tarball_clients) |slot| {
if (slot != null) { any_connected = true; break; }
}
if (!any_connected) return error.ConnectionFailed;
self.tarball_clients_initialized = true;
_ = debug.timer("fetcher: connection pool init", init_start);
}
fn findAvailableClient(self: *Fetcher) ?struct { client: *Http2Client, idx: usize } {
var attempts: usize = 0;
while (attempts < NUM_CONNECTIONS) : (attempts += 1) {
const idx = (self.tarball_round_robin + attempts) % NUM_CONNECTIONS;
if (self.tarball_clients[idx]) |client| { if (client.hasCapacity()) return .{ .client = client, .idx = idx }; }
}
return null;
}
pub fn initiateTarballConnectionsAsync(self: *Fetcher) void {
if (self.tarball_clients_initialized) return;
debug.log("fetcher: initiating {d} tarball connections (async)", .{NUM_CONNECTIONS});
for (&self.tarball_clients, 0..) |*slot, i| {
const client = Http2Client.init(self.allocator, self.registry_host, true) catch {
continue;
};
client.initiateConnectAsync() catch {
client.deinit(); continue;
};
slot.* = client; _ = i;
}
var any_connected = false;
for (self.tarball_clients) |slot| {
if (slot != null) { any_connected = true; break; }
}
if (any_connected) self.tarball_clients_initialized = true;
}
pub fn queueTarballAsync(self: *Fetcher, url: []const u8, handler: StreamHandler) !void {
try self.ensureTarballClients();
const parsed = try ParsedUrl.parse(url);
const available = self.findAvailableClient() orelse {
try self.pending.append(self.allocator, .{
.url = try self.allocator.dupe(u8, url),
.handler = handler,
}); return;
};
const ctx = try self.allocator.create(TarballCtx);
ctx.* = .{
.handler = handler,
.done = false,
.has_error = false,
.url = try self.allocator.dupe(u8, url),
.start_ns = @intCast(std.time.nanoTimestamp()),
.bytes = 0,
};
try self.tarball_contexts.append(
self.allocator,
ctx
);
try available.client.getStream(
parsed.path,
TarballCallbacks.onData,
TarballCallbacks.onComplete,
TarballCallbacks.onError,
ctx,
);
self.tarball_round_robin = (available.idx + 1) % NUM_CONNECTIONS;
}
pub fn tick(self: *Fetcher) usize {
self.ensureTarballClients() catch return 0;
const loop = uv.uv_default_loop();
for (&self.tarball_clients) |maybe_client| {
if (maybe_client) |c| c.flush() catch {};
}
_ = uv.uv_run(loop, uv.RUN_NOWAIT);
for (&self.tarball_clients) |maybe_client| {
if (maybe_client) |c| c.recycleCompletedRequests();
}
const completed = self.cleanupCompletedContexts();
self.dispatchPending();
return completed;
}
fn cleanupCompletedContexts(self: *Fetcher) usize {
var completed: usize = 0; var i: usize = 0;
while (i < self.tarball_contexts.items.len) {
const ctx = self.tarball_contexts.items[i];
if (ctx.done) {
completed += 1;
self.allocator.free(ctx.url);
self.allocator.destroy(ctx);
_ = self.tarball_contexts.swapRemove(i);
} else i += 1;
}
return completed;
}
fn dispatchPending(self: *Fetcher) void {
while (self.pending.items.len > 0) {
const available = self.findAvailableClient() orelse break;
const req = self.pending.pop() orelse break;
const handler = req.handler orelse {
self.allocator.free(req.url); continue;
};
self.dispatchRequest(available.client, req.url, handler) catch |err| {
handler.on_error(errToFetchError(err), handler.user_data);
self.allocator.free(req.url); continue;
};
}
}
fn dispatchRequest(self: *Fetcher, client: *Http2Client, url: []const u8, handler: StreamHandler) !void {
const parsed = try ParsedUrl.parse(url);
const ctx = try self.allocator.create(TarballCtx);
ctx.* = .{
.handler = handler,
.done = false,
.has_error = false,
.url = url,
.start_ns = @intCast(std.time.nanoTimestamp()),
.bytes = 0,
};
errdefer self.allocator.destroy(ctx);
try self.tarball_contexts.append(self.allocator, ctx);
try client.getStream(
parsed.path,
TarballCallbacks.onData,
TarballCallbacks.onComplete,
TarballCallbacks.onError,
ctx,
);
}
fn errToFetchError(err: anyerror) FetchError {
return switch (err) {
error.InvalidUrl => FetchError.InvalidUrl,
error.OutOfMemory => FetchError.OutOfMemory,
else => FetchError.Http2Error,
};
}
pub fn pendingTarballCount(self: *Fetcher) usize {
return self.tarball_contexts.items.len;
}
pub fn finishTarballs(self: *Fetcher) void {
const loop = uv.uv_default_loop();
var last_report: u64 = @intCast(std.time.nanoTimestamp());
var loops: usize = 0;
var completed: usize = 0;
const start = last_report;
while (self.tarball_contexts.items.len > 0 or self.pending.items.len > 0) {
for (&self.tarball_clients) |maybe_client| {
if (maybe_client) |c| c.flush() catch {};
}
if (uv.uv_run(loop, uv.RUN_ONCE) == 0 and self.pending.items.len == 0 and self.tarball_contexts.items.len == 0) break;
loops += 1;
for (&self.tarball_clients) |maybe_client| {
if (maybe_client) |c| c.recycleCompletedRequests();
}
var i: usize = 0;
while (i < self.tarball_contexts.items.len) {
const ctx = self.tarball_contexts.items[i];
if (ctx.done) {
if (!ctx.has_error) {
const elapsed_ms: u64 = @intCast((@as(u64, @intCast(std.time.nanoTimestamp())) - ctx.start_ns) / 1_000_000);
const url_copy = self.allocator.dupe(u8, ctx.url) catch null;
if (url_copy) |url| {
self.tarball_stats.append(self.allocator, .{ .url = url, .bytes = ctx.bytes, .elapsed_ms = elapsed_ms }) catch {};
}
}
self.allocator.free(ctx.url);
self.allocator.destroy(ctx);
_ = self.tarball_contexts.swapRemove(i);
completed += 1;
} else {
i += 1;
}
}
while (self.pending.items.len > 0) {
var queued = false;
for (&self.tarball_clients, 0..) |maybe_client, conn_idx| {
if (maybe_client) |client| {
if (client.hasCapacity()) {
const maybe_req = self.pending.pop();
const req = maybe_req orelse break;
if (req.handler) |handler| {
const parsed = ParsedUrl.parse(req.url) catch {
handler.on_error(FetchError.InvalidUrl, handler.user_data);
self.allocator.free(req.url);
continue;
};
const ctx = self.allocator.create(TarballCtx) catch {
handler.on_error(FetchError.OutOfMemory, handler.user_data);
self.allocator.free(req.url);
continue;
};
ctx.* = .{
.handler = handler,
.done = false,
.has_error = false,
.url = req.url,
.start_ns = @intCast(std.time.nanoTimestamp()),
.bytes = 0,
};
self.tarball_contexts.append(self.allocator, ctx) catch {
self.allocator.destroy(ctx);
self.allocator.free(req.url);
continue;
};
client.getStream(
parsed.path,
struct {
fn onData(data: []const u8, ud: ?*anyopaque) void {
const c: *TarballCtx = @ptrCast(@alignCast(ud));
c.bytes += data.len;
c.handler.on_data(data, c.handler.user_data);
}
}.onData,
struct {
fn onComplete(status: u16, ud: ?*anyopaque) void {
const c: *TarballCtx = @ptrCast(@alignCast(ud));
c.handler.on_complete(status, c.handler.user_data);
if (debug.enabled) {
const elapsed_ms: u64 = @intCast((@as(u64, @intCast(std.time.nanoTimestamp())) - c.start_ns) / 1_000_000);
debug.log(" tarball: done {s} {d}ms {d} bytes status={d}", .{ c.url, elapsed_ms, c.bytes, status });
}
c.done = true;
}
}.onComplete,
struct {
fn onError(err: FetchError, ud: ?*anyopaque) void {
const c: *TarballCtx = @ptrCast(@alignCast(ud));
c.handler.on_error(err, c.handler.user_data);
if (debug.enabled) {
const elapsed_ms: u64 = @intCast((@as(u64, @intCast(std.time.nanoTimestamp())) - c.start_ns) / 1_000_000);
debug.log(" tarball: error {s} {d}ms {d} bytes", .{ c.url, elapsed_ms, c.bytes });
}
c.done = true;
c.has_error = true;
}
}.onError,
ctx,
) catch {
handler.on_error(FetchError.Http2Error, handler.user_data);
ctx.done = true;
};
queued = true;
_ = conn_idx;
} else {
self.allocator.free(req.url);
}
break;
}
}
}
if (!queued) break;
}
const now: u64 = @intCast(std.time.nanoTimestamp());
if (now - last_report > 1_000_000_000) {
var total_bytes: usize = 0;
for (self.tarball_contexts.items) |ctx| {
total_bytes += ctx.bytes;
}
debug.log(" h2: {d} in-flight, {d} pending, {d} completed, {d} loops", .{
self.tarball_contexts.items.len,
self.pending.items.len,
completed,
loops,
});
debug.log(" h2: tarball progress in-flight bytes={d}", .{ total_bytes });
last_report = now;
}
}
const elapsed_ns: u64 = @intCast(@as(i128, @intCast(std.time.nanoTimestamp())) - @as(i128, start));
debug.log("fetcher: finishTarballs completed in {d}ms, {d} loops, {d} completed", .{
elapsed_ns / 1_000_000,
loops,
completed,
});
if (debug.enabled and self.tarball_stats.items.len > 0) {
var top_time: [5]?TarballStats = .{null} ** 5;
var top_size: [5]?TarballStats = .{null} ** 5;
for (self.tarball_stats.items) |stat| {
var idx_time: usize = top_time.len;
for (top_time, 0..) |slot, i| {
if (slot == null or stat.elapsed_ms > slot.?.elapsed_ms) {
idx_time = i;
break;
}
}
if (idx_time < top_time.len) {
var carry = stat;
var j = idx_time;
while (j < top_time.len) : (j += 1) {
const next = top_time[j];
top_time[j] = carry;
if (next) |n| {
carry = n;
} else {
break;
}
}
}
var idx_size: usize = top_size.len;
for (top_size, 0..) |slot, i| {
if (slot == null or stat.bytes > slot.?.bytes) {
idx_size = i;
break;
}
}
if (idx_size < top_size.len) {
var carry_size = stat;
var k = idx_size;
while (k < top_size.len) : (k += 1) {
const next_size = top_size[k];
top_size[k] = carry_size;
if (next_size) |n| {
carry_size = n;
} else {
break;
}
}
}
}
debug.log("fetcher: top tarballs by time", .{});
for (top_time, 0..) |maybe_stat, i| {
if (maybe_stat) |stat| {
debug.log(" {d}. {s} {d}ms {d} bytes", .{ i + 1, stat.url, stat.elapsed_ms, stat.bytes });
}
}
debug.log("fetcher: top tarballs by size", .{});
for (top_size, 0..) |maybe_stat, i| {
if (maybe_stat) |stat| {
debug.log(" {d}. {s} {d} bytes {d}ms", .{ i + 1, stat.url, stat.bytes, stat.elapsed_ms });
}
}
}
}
pub fn fetchMetadata(self: *Fetcher, package_name: []const u8, allocator: std.mem.Allocator) ![]u8 {
return self.fetchMetadataFull(package_name, false, allocator);
}
const DecodedMetadata = struct {
data: []u8,
compressed: bool,
};
fn metaClientCanQueue(c: *const Http2Client) bool {
return c.h2_session != null and c.connected == 1 and c.request_count < MAX_PENDING_REQUESTS - 1;
}
fn nextMetaClient(self: *Fetcher, conn_idx: *usize) ?*Http2Client {
var attempts: usize = 0;
while (attempts < NUM_META_CONNECTIONS) : (attempts += 1) {
if (self.meta_clients[conn_idx.*]) |c| {
if (metaClientCanQueue(c)) return c;
}
conn_idx.* = (conn_idx.* + 1) % NUM_META_CONNECTIONS;
}
return null;
}
fn flushMetaClients(self: *Fetcher) void {
for (self.meta_clients) |maybe_client| {
if (maybe_client) |c| c.flush() catch {};
}
}
fn metaRequestsComplete(self: *Fetcher) bool {
for (self.meta_clients) |maybe_client| {
if (maybe_client) |c| {
for (c.requests[0..c.request_count]) |*req| {
if (!req.done and !req.has_error) return false;
}
}
}
return true;
}
fn decodeMetadataOwned(
req: *Http2Client.RequestState,
allocator: std.mem.Allocator,
decompress_buf: *std.ArrayListUnmanaged(u8),
) ?DecodedMetadata {
if (req.has_error or req.status_code != 200) return null;
if (req.content_encoding != .gzip) {
const data = allocator.dupe(u8, req.response_body.items) catch return null;
return .{ .data = data, .compressed = false };
}
decompress_buf.clearRetainingCapacity();
const decomp = extractor.GzipDecompressor.init(allocator) catch return null;
defer decomp.deinit();
_ = decomp.decompress(req.response_body.items, struct {
fn onChunk(data: []const u8, ctx: ?*anyopaque) anyerror!void {
const buf: *std.ArrayListUnmanaged(u8) = @ptrCast(@alignCast(ctx));
try buf.appendSlice(c_allocator, data);
}
}.onChunk, decompress_buf) catch return null;
const data = allocator.dupe(u8, decompress_buf.items) catch return null;
return .{ .data = data, .compressed = true };
}
pub fn fetchMetadataFull(self: *Fetcher, package_name: []const u8, full: bool, allocator: std.mem.Allocator) ![]u8 {
try self.ensureMetaClients();
self.clearLastHttpError();
for (self.meta_clients) |maybe_client| {
if (maybe_client) |client| {
var path_buf: [512]u8 = undefined;
const path_slice = std.fmt.bufPrint(&path_buf, "/{s}", .{package_name}) catch return error.OutOfMemory;
const accept: [:0]const u8 = if (full) "application/json" else "application/vnd.npm.install-v1+json";
return client.getWithAccept(path_slice, accept, allocator) catch |err| {
if (err == error.ResponseError) {
var url_buf: [1024]u8 = undefined;
const url = std.fmt.bufPrint(&url_buf, "https://{s}/{s}", .{ self.registry_host, package_name }) catch "";
self.setLastHttpError(url, client.last_response_status_code);
}
return err;
};
}
}
return error.ConnectionFailed;
}
pub const MetadataResult = struct {
name: []const u8,
data: ?[]u8,
compressed: bool,
has_error: bool,
};
fn storeMetadataBatchResult(
req: *Http2Client.RequestState,
result: *MetadataResult,
allocator: std.mem.Allocator,
decompress_buf: *std.ArrayListUnmanaged(u8),
) bool {
const decoded = decodeMetadataOwned(req, allocator, decompress_buf) orelse {
result.has_error = true;
return false;
};
result.data = decoded.data;
result.compressed = decoded.compressed;
return true;
}
pub fn fetchMetadataBatch(self: *Fetcher, names: []const []const u8, allocator: std.mem.Allocator) ![]MetadataResult {
if (names.len == 0) return &[_]MetadataResult{};
var total_start: u64 = @intCast(std.time.nanoTimestamp());
try self.ensureMetaClients();
total_start = debug.timer(" meta: get clients", total_start);
var active_connections: usize = 0;
for (self.meta_clients) |maybe_client| {
if (maybe_client != null) active_connections += 1;
} if (active_connections == 0) return error.ConnectionFailed;
debug.log(" meta: batch {d} packages across {d} connections", .{ names.len, active_connections });
var results = try allocator.alloc(MetadataResult, names.len);
for (results, 0..) |*r, i| {
r.* = .{ .name = names[i], .data = null, .compressed = false, .has_error = false };
}
const total_capacity = active_connections * (MAX_PENDING_REQUESTS - 1);
var offset: usize = 0;
var batch_num: usize = 0;
var decompress_buf = std.ArrayListUnmanaged(u8){};
defer decompress_buf.deinit(c_allocator);
while (offset < names.len) {
const end = @min(offset + total_capacity, names.len);
var batch_start: u64 = @intCast(std.time.nanoTimestamp());
debug.log(" meta: batch {d} ({d}-{d})", .{ batch_num, offset, end });
var queued: usize = 0;
var conn_idx: usize = 0;
for (offset..end) |i| {
const result = &results[i];
const name = names[i];
const c = self.nextMetaClient(&conn_idx) orelse {
result.has_error = true; continue;
};
const session = c.h2_session orelse {
result.has_error = true; continue;
};
var path_buf: [512]u8 = undefined;
const path = std.fmt.bufPrint(&path_buf, "/{s}", .{name}) catch {
result.has_error = true;
continue;
};
var hdrs = [_]nghttp2.nv{
Http2Client.makeNv(":method", "GET"),
Http2Client.makeNv(":path", c.allocator.dupeZ(u8, path) catch {
result.has_error = true; continue;
}),
Http2Client.makeNv(":scheme", "https"),
Http2Client.makeNv(":authority", c.host),
Http2Client.makeNv("accept", "application/vnd.npm.install-v1+json"),
Http2Client.makeNv("accept-encoding", "gzip"),
Http2Client.makeNv("user-agent", user_agent),
};
const req = &c.requests[c.request_count];
c.request_count += 1;
req.* = .{
.stream_id = 0,
.path = hdrs[1].value[0..hdrs[1].valuelen :0],
.on_data = null,
.on_complete = null,
.on_error = null,
.userdata = result,
.response_body = .{},
.status_code = 0,
.done = false,
.has_error = false,
.start_ns = @intCast(std.time.nanoTimestamp()),
.end_ns = 0,
.bytes = 0,
.content_encoding = .identity,
};
const sid = nghttp2.nghttp2_submit_request(session, null, &hdrs, hdrs.len, null, req);
if (sid < 0) {
c.request_count -= 1;
if (req.path) |p| c.allocator.free(p);
result.has_error = true;
continue;
}
req.stream_id = sid;
queued += 1;
conn_idx = (conn_idx + 1) % NUM_META_CONNECTIONS;
}
batch_start = debug.timer(" meta: queue requests", batch_start);
self.flushMetaClients();
const loop = uv.uv_default_loop();
var all_done = false;
var loops: usize = 0;
const run_start: u64 = @intCast(std.time.nanoTimestamp());
while (!all_done) {
_ = uv.uv_run(loop, uv.RUN_ONCE);
loops += 1;
all_done = self.metaRequestsComplete();
}
const elapsed_ns: u64 = @intCast(@as(i128, @intCast(std.time.nanoTimestamp())) - @as(i128, run_start));
debug.log(" h2: run complete in {d}ms, {d} loops", .{ elapsed_ns / 1_000_000, loops });
batch_start = debug.timer(" meta: run h2 loop", batch_start);
var slow_count: usize = 0;
var max_req_ms: u64 = 0;
var max_req_name: []const u8 = "";
var total_bytes: usize = 0;
for (self.meta_clients) |maybe_client| {
if (maybe_client) |c| {
for (c.requests[0..c.request_count]) |*req| {
const result: *MetadataResult = @ptrCast(@alignCast(req.userdata));
const end_ns = if (req.end_ns != 0) req.end_ns else @as(u64, @intCast(std.time.nanoTimestamp()));
const duration_ms: u64 = @intCast((end_ns - req.start_ns) / 1_000_000);
total_bytes += req.response_body.items.len;
if (duration_ms > max_req_ms) {
max_req_ms = duration_ms;
max_req_name = result.name;
}
if (duration_ms >= META_SLOW_LOG_MS) {
slow_count += 1;
debug.log(" meta: slow {s} {d}ms {d} bytes status={d}", .{
result.name,
duration_ms,
req.response_body.items.len,
req.status_code,
});
}
}
}
}
debug.log(" meta: summary slow={d} max={s} {d}ms total_bytes={d}", .{ slow_count, max_req_name, max_req_ms, total_bytes });
var success: usize = 0;
for (self.meta_clients) |maybe_client| {
if (maybe_client) |c| {
for (c.requests[0..c.request_count]) |*req| {
const result: *MetadataResult = @ptrCast(@alignCast(req.userdata));
if (storeMetadataBatchResult(req, result, allocator, &decompress_buf)) success += 1;
}
c.resetRequests();
}
}
_ = debug.timer(" meta: copy results", batch_start);
debug.log(" meta: queued={d} success={d}", .{ queued, success });
offset = end;
batch_num += 1;
}
return results;
}
pub const MetadataCallback = *const fn (
name: []const u8,
data: ?[]const u8,
has_error: bool,
userdata: ?*anyopaque
) void;
const MetadataStreamTracker = struct {
name: []const u8,
index: usize,
};
fn emitMetadataStreamingResult(
req: *Http2Client.RequestState,
name: []const u8,
allocator: std.mem.Allocator,
decompress_buf: *std.ArrayListUnmanaged(u8),
callback: MetadataCallback,
userdata: ?*anyopaque,
) void {
if (req.has_error or req.status_code != 200) {
callback(name, null, true, userdata);
return;
}
if (req.content_encoding != .gzip) {
callback(name, req.response_body.items, false, userdata);
return;
}
decompress_buf.clearRetainingCapacity();
const decomp = extractor.GzipDecompressor.init(allocator) catch {
callback(name, null, true, userdata);
return;
};
defer decomp.deinit();
_ = decomp.decompress(req.response_body.items, struct {
fn onChunk(data: []const u8, ctx: ?*anyopaque) anyerror!void {
const buf: *std.ArrayListUnmanaged(u8) = @ptrCast(@alignCast(ctx));
try buf.appendSlice(c_allocator, data);
}
}.onChunk, decompress_buf) catch {
callback(name, null, true, userdata);
return;
};
callback(name, decompress_buf.items, false, userdata);
}
fn emitCompletedStreamingMetadataCallbacks(
self: *Fetcher,
processed: []bool,
allocator: std.mem.Allocator,
decompress_buf: *std.ArrayListUnmanaged(u8),
callback: MetadataCallback,
userdata: ?*anyopaque,
) void {
for (self.meta_clients) |maybe_client| {
const c = maybe_client orelse continue;
for (c.requests[0..c.request_count]) |*req| {
if (!req.done and !req.has_error) continue;
const tracker: *MetadataStreamTracker = @ptrCast(@alignCast(req.userdata));
if (processed[tracker.index]) continue;
processed[tracker.index] = true;
emitMetadataStreamingResult(req, tracker.name, allocator, decompress_buf, callback, userdata);
}
}
}
pub fn fetchMetadataStreaming(
self: *Fetcher,
names: []const []const u8,
allocator: std.mem.Allocator,
callback: MetadataCallback,
userdata: ?*anyopaque,
) !void {
if (names.len == 0) return;
var total_start: u64 = @intCast(std.time.nanoTimestamp());
try self.ensureMetaClients();
total_start = debug.timer(" meta: get clients", total_start);
var active_connections: usize = 0;
for (self.meta_clients) |maybe_client| {
if (maybe_client != null) active_connections += 1;
}
if (active_connections == 0) return error.ConnectionFailed;
debug.log(" meta: streaming {d} packages across {d} connections", .{ names.len, active_connections });
const processed = try allocator.alloc(bool, names.len);
defer allocator.free(processed);
@memset(processed, false);
var trackers = try allocator.alloc(MetadataStreamTracker, names.len);
defer allocator.free(trackers);
for (names, 0..) |name, i| {
trackers[i] = .{ .name = name, .index = i };
}
const total_capacity = active_connections * (MAX_PENDING_REQUESTS - 1);
var offset: usize = 0;
var batch_num: usize = 0;
var decompress_buf = std.ArrayListUnmanaged(u8){};
defer decompress_buf.deinit(c_allocator);
while (offset < names.len) {
const end = @min(offset + total_capacity, names.len);
var batch_start: u64 = @intCast(std.time.nanoTimestamp());
debug.log(" meta: batch {d} ({d}-{d})", .{ batch_num, offset, end });
var queued: usize = 0;
var conn_idx: usize = 0;
for (offset..end) |i| {
const tracker = &trackers[i];
const name = names[i];
const c = self.nextMetaClient(&conn_idx) orelse continue;
const session = c.h2_session orelse continue;
var path_buf: [512]u8 = undefined;
const path = std.fmt.bufPrint(&path_buf, "/{s}", .{name}) catch continue;
var hdrs = [_]nghttp2.nv{
Http2Client.makeNv(":method", "GET"),
Http2Client.makeNv(":path", c.allocator.dupeZ(u8, path) catch continue),
Http2Client.makeNv(":scheme", "https"),
Http2Client.makeNv(":authority", c.host),
Http2Client.makeNv("accept", "application/vnd.npm.install-v1+json"),
Http2Client.makeNv("accept-encoding", "gzip"),
Http2Client.makeNv("user-agent", user_agent),
};
const req = &c.requests[c.request_count];
c.request_count += 1;
req.* = .{
.stream_id = 0,
.path = hdrs[1].value[0..hdrs[1].valuelen :0],
.on_data = null,
.on_complete = null,
.on_error = null,
.userdata = tracker,
.response_body = .{},
.status_code = 0,
.done = false,
.has_error = false,
.start_ns = 0,
.end_ns = 0,
.bytes = 0,
.content_encoding = .identity,
};
req.start_ns = @intCast(std.time.nanoTimestamp());
const sid = nghttp2.nghttp2_submit_request(session, null, &hdrs, hdrs.len, null, req);
if (sid < 0) {
c.request_count -= 1;
if (req.path) |p| c.allocator.free(p);
continue;
}
req.stream_id = sid;
queued += 1;
conn_idx = (conn_idx + 1) % NUM_META_CONNECTIONS;
}
batch_start = debug.timer(" meta: queue requests", batch_start);
self.flushMetaClients();
const loop = uv.uv_default_loop();
var all_done = false;
var loops: usize = 0;
const run_start: u64 = @intCast(std.time.nanoTimestamp());
while (!all_done) {
_ = uv.uv_run(loop, uv.RUN_ONCE);
loops += 1;
self.emitCompletedStreamingMetadataCallbacks(processed, allocator, &decompress_buf, callback, userdata);
all_done = self.metaRequestsComplete();
}
const elapsed_ns: u64 = @intCast(@as(i128, @intCast(std.time.nanoTimestamp())) - @as(i128, run_start));
debug.log(" h2: run complete in {d}ms, {d} loops", .{ elapsed_ns / 1_000_000, loops });
var slow_count: usize = 0;
var max_req_ms: u64 = 0;
var max_req_name: []const u8 = "";
var total_bytes: usize = 0;
for (self.meta_clients) |maybe_client| {
if (maybe_client) |c| {
for (c.requests[0..c.request_count]) |*req| {
const tracker: *MetadataStreamTracker = @ptrCast(@alignCast(req.userdata));
const end_ns = if (req.end_ns != 0) req.end_ns else @as(u64, @intCast(std.time.nanoTimestamp()));
const duration_ms: u64 = @intCast((end_ns - req.start_ns) / 1_000_000);
total_bytes += req.response_body.items.len;
if (duration_ms > max_req_ms) {
max_req_ms = duration_ms;
max_req_name = tracker.name;
}
if (duration_ms >= META_SLOW_LOG_MS) {
slow_count += 1;
debug.log(" meta: slow {s} {d}ms {d} bytes status={d}", .{
tracker.name,
duration_ms,
req.response_body.items.len,
req.status_code,
});
}
}
}
}
debug.log(" meta: summary slow={d} max={s} {d}ms total_bytes={d}", .{ slow_count, max_req_name, max_req_ms, total_bytes });
for (self.meta_clients) |maybe_client| {
if (maybe_client) |c| c.resetRequests();
}
offset = end;
batch_num += 1;
}
}
pub fn fetchTarball(self: *Fetcher, url: []const u8, handler: StreamHandler) !void {
try self.pending.append(self.allocator, .{ .url = try self.allocator.dupe(u8, url), .handler = handler });
}
pub fn run(self: *Fetcher) !void {
if (self.pending.items.len == 0 and self.tarball_contexts.items.len == 0) return;
const run_start: u64 = @intCast(std.time.nanoTimestamp());
const total_requests = self.pending.items.len + self.tarball_contexts.items.len;
debug.log("fetcher: {d} tarballs to download (pending={d}, in-flight={d})", .{
total_requests,
self.pending.items.len,
self.tarball_contexts.items.len,
});
try self.ensureTarballClients();
self.finishTarballs();
const elapsed_ns: u64 = @intCast(@as(i128, @intCast(std.time.nanoTimestamp())) - @as(i128, run_start));
debug.log("fetcher: {d} tarballs complete in {d}ms", .{ total_requests, elapsed_ns / 1_000_000 });
}
};

File Metadata

Mime Type
text/x-asm
Expires
Thu, Mar 26, 4:46 PM (2 d)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
511732
Default Alt Text
fetcher.zig (55 KB)

Event Timeline