Page Menu
Home
Phorge
Search
Configure Global Search
Log In
Files
F2916333
fetcher.zig
No One
Temporary
Actions
Download File
Edit File
Delete File
View Transforms
Subscribe
Flag For Later
Award Token
Size
55 KB
Referenced Files
None
Subscribers
None
fetcher.zig
View Options
const
std
=
@import
(
"std"
);
const
c_allocator
=
std
.
heap
.
c_allocator
;
const
debug
=
@import
(
"debug.zig"
);
const
extractor
=
@import
(
"extractor.zig"
);
const
uv
=
@import
(
"uv.zig"
);
const
tlsuv
=
@import
(
"tlsuv.zig"
);
const
nghttp2
=
@import
(
"nghttp2.zig"
);
const
config
=
@import
(
"config"
);
const
user_agent
:
[
:
0
]
const
u8
=
"ant/"
++
config
.
version
;
pub
const
FetchError
=
error
{
ConnectionFailed
,
TlsError
,
Http2Error
,
Timeout
,
InvalidUrl
,
ResponseError
,
OutOfMemory
,
};
pub
const
ParsedUrl
=
struct
{
scheme
:
[]
const
u8
,
host
:
[]
const
u8
,
port
:
u16
,
path
:
[]
const
u8
,
pub
fn
parse
(
url
:
[]
const
u8
)
!
ParsedUrl
{
var
remaining
=
url
;
const
scheme_end
=
std
.
mem
.
indexOf
(
u8
,
remaining
,
"://"
)
orelse
return
error
.
InvalidUrl
;
const
scheme
=
remaining
[
0
..
scheme_end
];
remaining
=
remaining
[
scheme_end
+
3
..];
const
path_start
=
std
.
mem
.
indexOf
(
u8
,
remaining
,
"/"
)
orelse
remaining
.
len
;
const
host_port
=
remaining
[
0
..
path_start
];
remaining
=
if
(
path_start
<
remaining
.
len
)
remaining
[
path_start
..]
else
"/"
;
var
host
:
[]
const
u8
=
host_port
;
var
port
:
u16
=
if
(
std
.
mem
.
eql
(
u8
,
scheme
,
"https"
))
443
else
80
;
if
(
std
.
mem
.
indexOf
(
u8
,
host_port
,
":"
))
|
colon
|
{
host
=
host_port
[
0
..
colon
];
port
=
std
.
fmt
.
parseInt
(
u16
,
host_port
[
colon
+
1
..],
10
)
catch
return
error
.
InvalidUrl
;
}
return
.{
.
scheme
=
scheme
,
.
host
=
host
,
.
port
=
port
,
.
path
=
remaining
};
}
};
pub
const
StreamHandler
=
struct
{
on_data
:
*
const
fn
([]
const
u8
,
?*
anyopaque
)
void
,
on_complete
:
*
const
fn
(
u16
,
?*
anyopaque
)
void
,
on_error
:
*
const
fn
(
FetchError
,
?*
anyopaque
)
void
,
user_data
:
?*
anyopaque
,
pub
fn
init
(
on_data
:
*
const
fn
([]
const
u8
,
?*
anyopaque
)
void
,
on_complete
:
*
const
fn
(
u16
,
?*
anyopaque
)
void
,
on_error
:
*
const
fn
(
FetchError
,
?*
anyopaque
)
void
,
user_data
:
?*
anyopaque
,
)
StreamHandler
{
return
.{
.
on_data
=
on_data
,
.
on_complete
=
on_complete
,
.
on_error
=
on_error
,
.
user_data
=
user_data
};
}
};
const
PendingRequest
=
struct
{
url
:
[]
const
u8
,
handler
:
?
StreamHandler
,
};
const
MAX_PENDING_REQUESTS
=
20
;
const
NUM_CONNECTIONS
=
6
;
const
NUM_META_CONNECTIONS
=
3
;
const
META_SLOW_LOG_MS
:
u64
=
250
;
const
Http2Client
=
struct
{
allocator
:
std
.
mem
.
Allocator
,
loop
:
*
uv
.
loop_t
,
tls
:
tlsuv
.
stream_t
,
h2_session
:
?*
nghttp2
.
session
,
host
:
[
:
0
]
const
u8
,
use_tls
:
bool
,
connected
:
i32
,
connect_pending
:
bool
,
closing
:
bool
,
write_buf
:
std
.
ArrayListUnmanaged
(
u8
),
requests
:
[
MAX_PENDING_REQUESTS
]
RequestState
,
request_count
:
usize
,
requests_done
:
usize
,
last_response_status_code
:
u16
,
const
RequestState
=
struct
{
stream_id
:
i32
,
path
:
?
[
:
0
]
const
u8
,
on_data
:
?*
const
fn
([]
const
u8
,
?*
anyopaque
)
void
,
on_complete
:
?*
const
fn
(
u16
,
?*
anyopaque
)
void
,
on_error
:
?*
const
fn
(
FetchError
,
?*
anyopaque
)
void
,
userdata
:
?*
anyopaque
,
response_body
:
std
.
ArrayListUnmanaged
(
u8
),
status_code
:
u16
,
done
:
bool
,
has_error
:
bool
,
start_ns
:
u64
,
end_ns
:
u64
,
bytes
:
usize
,
content_encoding
:
ContentEncoding
,
};
const
ContentEncoding
=
enum
{
identity
,
gzip
,
};
const
alpn_protocols
=
[
_
][
*:
0
]
const
u8
{
"h2"
,
"http/1.1"
};
pub
fn
init
(
allocator
:
std
.
mem
.
Allocator
,
host
:
[]
const
u8
,
use_tls
:
bool
)
!*
Http2Client
{
const
client
=
try
allocator
.
create
(
Http2Client
);
errdefer
allocator
.
destroy
(
client
);
const
host_z
=
try
allocator
.
dupeZ
(
u8
,
host
);
errdefer
allocator
.
free
(
host_z
);
client
.
*
=
.{
.
allocator
=
allocator
,
.
loop
=
uv
.
uv_default_loop
(),
.
tls
=
.{},
.
h2_session
=
null
,
.
host
=
host_z
,
.
use_tls
=
use_tls
,
.
connected
=
0
,
.
connect_pending
=
false
,
.
closing
=
false
,
.
write_buf
=
.{},
.
requests
=
undefined
,
.
request_count
=
0
,
.
requests_done
=
0
,
.
last_response_status_code
=
0
,
};
for
(
&
client
.
requests
)
|*
req
|
{
req
.
*
=
.{
.
stream_id
=
0
,
.
path
=
null
,
.
on_data
=
null
,
.
on_complete
=
null
,
.
on_error
=
null
,
.
userdata
=
null
,
.
response_body
=
.{},
.
status_code
=
0
,
.
done
=
false
,
.
has_error
=
false
,
.
start_ns
=
0
,
.
end_ns
=
0
,
.
bytes
=
0
,
.
content_encoding
=
.
identity
,
};
}
if
(
tlsuv
.
tlsuv_stream_init
(
client
.
loop
,
&
client
.
tls
,
null
)
!=
0
)
{
allocator
.
free
(
host_z
);
allocator
.
destroy
(
client
);
return
error
.
ConnectionFailed
;
}
_
=
tlsuv
.
tlsuv_stream_set_hostname
(
&
client
.
tls
,
host_z
.
ptr
);
_
=
tlsuv
.
tlsuv_stream_set_protocols
(
&
client
.
tls
,
2
,
&
alpn_protocols
);
return
client
;
}
pub
fn
deinit
(
self
:
*
Http2Client
)
void
{
self
.
closing
=
true
;
self
.
connect_pending
=
false
;
for
(
&
self
.
requests
)
|*
req
|
{
req
.
on_data
=
null
;
req
.
on_complete
=
null
;
req
.
on_error
=
null
;
req
.
userdata
=
null
;
}
if
(
self
.
connected
>
0
)
{
self
.
tls
.
data
=
self
;
_
=
tlsuv
.
tlsuv_stream_close
(
&
self
.
tls
,
onStreamClose
);
while
(
self
.
connected
>
0
)
_
=
uv
.
uv_run
(
self
.
loop
,
uv
.
RUN_ONCE
);
}
if
(
self
.
h2_session
)
|
session
|
nghttp2
.
nghttp2_session_del
(
session
);
for
(
&
self
.
requests
)
|*
req
|
{
if
(
req
.
stream_id
!=
-
1
)
{
if
(
req
.
path
)
|
p
|
self
.
allocator
.
free
(
p
);
req
.
response_body
.
deinit
(
self
.
allocator
);
}
}
self
.
write_buf
.
deinit
(
self
.
allocator
);
self
.
allocator
.
free
(
self
.
host
);
self
.
allocator
.
destroy
(
self
);
}
pub
fn
resetRequests
(
self
:
*
Http2Client
)
void
{
for
(
self
.
requests
[
0
..
self
.
request_count
])
|*
req
|
{
if
(
req
.
stream_id
!=
-
1
)
{
if
(
req
.
path
)
|
p
|
self
.
allocator
.
free
(
p
);
req
.
response_body
.
deinit
(
self
.
allocator
);
}
req
.
*
=
.{
.
stream_id
=
0
,
.
path
=
null
,
.
on_data
=
null
,
.
on_complete
=
null
,
.
on_error
=
null
,
.
userdata
=
null
,
.
response_body
=
.{},
.
status_code
=
0
,
.
done
=
false
,
.
has_error
=
false
,
.
start_ns
=
0
,
.
end_ns
=
0
,
.
bytes
=
0
,
.
content_encoding
=
.
identity
,
};
}
self
.
request_count
=
0
;
self
.
requests_done
=
0
;
}
pub
fn
hasCapacity
(
self
:
*
const
Http2Client
)
bool
{
for
(
self
.
requests
[
0
..
self
.
request_count
])
|
req
|
{
if
(
req
.
stream_id
==
-
1
)
return
true
;
}
return
self
.
request_count
<
MAX_PENDING_REQUESTS
-
1
;
}
pub
fn
recycleCompletedRequests
(
self
:
*
Http2Client
)
void
{
if
(
self
.
requests_done
==
0
)
return
;
for
(
self
.
requests
[
0
..
self
.
request_count
])
|*
req
|
{
if
(
req
.
done
and
req
.
stream_id
!=
-
1
)
{
if
(
req
.
path
)
|
p
|
self
.
allocator
.
free
(
p
);
req
.
response_body
.
deinit
(
self
.
allocator
);
req
.
path
=
null
;
req
.
response_body
=
.{};
req
.
stream_id
=
-
1
;
}
}
}
fn
findOrAllocSlot
(
self
:
*
Http2Client
)
?*
RequestState
{
for
(
self
.
requests
[
0
..
self
.
request_count
])
|*
req
|
{
if
(
req
.
stream_id
==
-
1
)
return
req
;
}
if
(
self
.
request_count
<
MAX_PENDING_REQUESTS
)
{
const
req
=
&
self
.
requests
[
self
.
request_count
];
self
.
request_count
+=
1
;
return
req
;
}
return
null
;
}
fn
onStreamClose
(
handle
:
*
uv
.
handle_t
)
callconv
(.
c
)
void
{
const
tls
:
*
tlsuv
.
stream_t
=
@ptrCast
(
@alignCast
(
handle
));
const
client
:
*
Http2Client
=
@ptrCast
(
@alignCast
(
tls
.
data
));
client
.
connected
=
-
2
;
client
.
connect_pending
=
false
;
}
fn
findRequest
(
self
:
*
Http2Client
,
stream_id
:
i32
)
?*
RequestState
{
for
(
self
.
requests
[
0
..
self
.
request_count
])
|*
req
|
if
(
req
.
stream_id
==
stream_id
)
return
req
;
return
null
;
}
fn
h2Send
(
_
:
?*
nghttp2
.
session
,
data
:
[
*
c
]
const
u8
,
len
:
usize
,
_
:
c_int
,
ud
:
?*
anyopaque
)
callconv
(.
c
)
isize
{
const
client
:
*
Http2Client
=
@ptrCast
(
@alignCast
(
ud
));
client
.
write_buf
.
appendSlice
(
client
.
allocator
,
data
[
0
..
len
])
catch
return
nghttp2
.
ERR_NOMEM
;
return
@intCast
(
len
);
}
fn
h2FrameRecv
(
_
:
?*
nghttp2
.
session
,
frame
:
*
const
nghttp2
.
frame
,
ud
:
?*
anyopaque
)
callconv
(.
c
)
c_int
{
const
client
:
*
Http2Client
=
@ptrCast
(
@alignCast
(
ud
));
if
(
frame
.
hd
.
flags
&
nghttp2
.
FLAG_END_STREAM
!=
0
)
{
if
(
client
.
findRequest
(
frame
.
hd
.
stream_id
))
|
req
|
{
if
(
!
req
.
done
)
{
req
.
done
=
true
;
req
.
end_ns
=
@intCast
(
std
.
time
.
nanoTimestamp
());
client
.
requests_done
+=
1
;
if
(
req
.
on_complete
)
|
cb
|
cb
(
req
.
status_code
,
req
.
userdata
);
}
}
}
return
0
;
}
fn
h2DataChunk
(
session
:
?*
nghttp2
.
session
,
_
:
u8
,
stream_id
:
i32
,
data
:
[
*
c
]
const
u8
,
len
:
usize
,
ud
:
?*
anyopaque
)
callconv
(.
c
)
c_int
{
const
client
:
*
Http2Client
=
@ptrCast
(
@alignCast
(
ud
));
const
req
=
client
.
findRequest
(
stream_id
)
orelse
return
0
;
if
(
req
.
on_data
)
|
cb
|
cb
(
data
[
0
..
len
],
req
.
userdata
)
else
req
.
response_body
.
appendSlice
(
client
.
allocator
,
data
[
0
..
len
])
catch
{
req
.
has_error
=
true
;
};
req
.
bytes
+=
len
;
if
(
session
)
|
s
|
_
=
nghttp2
.
nghttp2_session_consume
(
s
,
stream_id
,
len
);
return
0
;
}
fn
h2Header
(
_
:
?*
nghttp2
.
session
,
frame
:
*
const
nghttp2
.
frame
,
name
:
[
*
c
]
const
u8
,
namelen
:
usize
,
value
:
[
*
c
]
const
u8
,
valuelen
:
usize
,
_
:
u8
,
ud
:
?*
anyopaque
)
callconv
(.
c
)
c_int
{
const
client
:
*
Http2Client
=
@ptrCast
(
@alignCast
(
ud
));
if
(
frame
.
hd
.
type
!=
nghttp2
.
HEADERS
)
return
0
;
const
req
=
client
.
findRequest
(
frame
.
hd
.
stream_id
)
orelse
return
0
;
if
(
namelen
==
7
and
std
.
mem
.
eql
(
u8
,
name
[
0
..
7
],
":status"
))
req
.
status_code
=
std
.
fmt
.
parseInt
(
u16
,
value
[
0
..
valuelen
],
10
)
catch
0
;
if
(
std
.
mem
.
eql
(
u8
,
name
[
0
..
namelen
],
"content-encoding"
))
{
if
(
std
.
mem
.
startsWith
(
u8
,
value
[
0
..
valuelen
],
"gzip"
))
{
req
.
content_encoding
=
.
gzip
;
}
}
return
0
;
}
fn
h2StreamClose
(
_
:
?*
nghttp2
.
session
,
stream_id
:
i32
,
error_code
:
u32
,
ud
:
?*
anyopaque
)
callconv
(.
c
)
c_int
{
const
client
:
*
Http2Client
=
@ptrCast
(
@alignCast
(
ud
));
const
req
=
client
.
findRequest
(
stream_id
)
orelse
return
0
;
if
(
!
req
.
done
)
{
req
.
done
=
true
;
req
.
end_ns
=
@intCast
(
std
.
time
.
nanoTimestamp
());
client
.
requests_done
+=
1
;
if
(
error_code
!=
0
)
{
req
.
has_error
=
true
;
if
(
req
.
on_error
)
|
cb
|
cb
(
FetchError
.
Http2Error
,
req
.
userdata
);
}
else
if
(
req
.
on_complete
)
|
cb
|
cb
(
req
.
status_code
,
req
.
userdata
);
}
return
0
;
}
fn
initH2
(
self
:
*
Http2Client
)
!
void
{
var
callbacks
:
*
nghttp2
.
session_callbacks
=
undefined
;
if
(
nghttp2
.
nghttp2_session_callbacks_new
(
&
callbacks
)
!=
0
)
return
error
.
Http2Error
;
defer
nghttp2
.
nghttp2_session_callbacks_del
(
callbacks
);
nghttp2
.
nghttp2_session_callbacks_set_send_callback2
(
callbacks
,
h2Send
);
nghttp2
.
nghttp2_session_callbacks_set_on_frame_recv_callback
(
callbacks
,
h2FrameRecv
);
nghttp2
.
nghttp2_session_callbacks_set_on_data_chunk_recv_callback
(
callbacks
,
h2DataChunk
);
nghttp2
.
nghttp2_session_callbacks_set_on_header_callback
(
callbacks
,
h2Header
);
nghttp2
.
nghttp2_session_callbacks_set_on_stream_close_callback
(
callbacks
,
h2StreamClose
);
var
session
:
*
nghttp2
.
session
=
undefined
;
if
(
nghttp2
.
nghttp2_session_client_new
(
&
session
,
callbacks
,
self
)
!=
0
)
return
error
.
Http2Error
;
self
.
h2_session
=
session
;
var
settings
=
[
_
]
nghttp2
.
settings_entry
{
.{
.
settings_id
=
nghttp2
.
SETTINGS_MAX_CONCURRENT_STREAMS
,
.
value
=
MAX_PENDING_REQUESTS
},
.{
.
settings_id
=
nghttp2
.
SETTINGS_INITIAL_WINDOW_SIZE
,
.
value
=
16
*
1024
*
1024
},
};
if
(
nghttp2
.
nghttp2_submit_settings
(
self
.
h2_session
.
?
,
nghttp2
.
FLAG_NONE
,
&
settings
,
settings
.
len
)
!=
0
)
return
error
.
Http2Error
;
const
conn_window_increase
:
i32
=
(
16
*
1024
*
1024
)
-
65535
;
_
=
nghttp2
.
nghttp2_submit_window_update
(
self
.
h2_session
.
?
,
nghttp2
.
FLAG_NONE
,
0
,
conn_window_increase
);
}
fn
flush
(
self
:
*
Http2Client
)
!
void
{
if
(
self
.
closing
)
return
error
.
ConnectionFailed
;
if
(
self
.
h2_session
)
|
session
|
while
(
nghttp2
.
nghttp2_session_want_write
(
session
)
!=
0
)
if
(
nghttp2
.
nghttp2_session_send
(
session
)
!=
0
)
break
;
if
(
self
.
write_buf
.
items
.
len
>
0
)
{
const
data
=
try
self
.
allocator
.
dupe
(
u8
,
self
.
write_buf
.
items
);
self
.
write_buf
.
clearRetainingCapacity
();
const
wr
=
try
self
.
allocator
.
create
(
uv
.
write_t
);
wr
.
data
=
data
.
ptr
;
var
buf
=
uv
.
buf_t
{
.
base
=
data
.
ptr
,
.
len
=
data
.
len
};
if
(
tlsuv
.
tlsuv_stream_write
(
wr
,
&
self
.
tls
,
&
buf
,
onWrite
)
!=
0
)
{
self
.
allocator
.
free
(
data
);
self
.
allocator
.
destroy
(
wr
);
return
error
.
ConnectionFailed
;
}
}
}
fn
onWrite
(
wr
:
*
uv
.
write_t
,
_
:
c_int
)
callconv
(.
c
)
void
{
const
data_ptr
:
[
*
]
u8
=
@ptrCast
(
wr
.
data
);
std
.
c
.
free
(
data_ptr
);
std
.
c
.
free
(
@ptrCast
(
wr
));
}
fn
allocBuf
(
_
:
*
uv
.
handle_t
,
size
:
usize
,
buf
:
*
uv
.
buf_t
)
callconv
(.
c
)
void
{
buf
.
base
=
@ptrCast
(
std
.
c
.
malloc
(
size
)
orelse
return
);
buf
.
len
=
size
;
}
fn
onRead
(
stream
:
*
uv
.
stream_t
,
nread
:
isize
,
buf
:
*
const
uv
.
buf_t
)
callconv
(.
c
)
void
{
const
tls
:
*
tlsuv
.
stream_t
=
@ptrCast
(
@alignCast
(
stream
));
const
client
:
*
Http2Client
=
@ptrCast
(
@alignCast
(
tls
.
data
));
defer
if
(
buf
.
base
)
|
b
|
std
.
c
.
free
(
b
);
if
(
client
.
closing
)
return
;
if
(
nread
<
0
)
{
for
(
client
.
requests
[
0
..
client
.
request_count
])
|*
req
|
if
(
!
req
.
done
)
{
req
.
done
=
true
;
req
.
has_error
=
true
;
client
.
requests_done
+=
1
;
if
(
req
.
on_error
)
|
cb
|
cb
(
FetchError
.
ConnectionFailed
,
req
.
userdata
);
};
return
;
}
if
(
nread
>
0
and
client
.
h2_session
!=
null
)
{
_
=
nghttp2
.
nghttp2_session_mem_recv
(
client
.
h2_session
.
?
,
@ptrCast
(
buf
.
base
),
@intCast
(
nread
));
client
.
flush
()
catch
{};
}
}
fn
onConnect
(
req
:
*
uv
.
connect_t
,
status
:
c_int
)
callconv
(.
c
)
void
{
const
ctx
:
*
ConnectCtx
=
@ptrCast
(
@alignCast
(
req
.
data
));
defer
ctx
.
client
.
allocator
.
destroy
(
ctx
);
ctx
.
client
.
connect_pending
=
false
;
if
(
ctx
.
client
.
closing
)
{
ctx
.
client
.
connected
=
-
1
;
_
=
tlsuv
.
tlsuv_stream_close
(
&
ctx
.
client
.
tls
,
onStreamClose
);
return
;
}
if
(
status
<
0
)
{
ctx
.
client
.
connected
=
-
1
;
return
;
}
ctx
.
client
.
connected
=
1
;
ctx
.
client
.
tls
.
data
=
ctx
.
client
;
ctx
.
client
.
initH2
()
catch
{
ctx
.
client
.
connected
=
-
1
;
return
;
};
_
=
tlsuv
.
tlsuv_stream_read_start
(
&
ctx
.
client
.
tls
,
allocBuf
,
onRead
);
ctx
.
client
.
flush
()
catch
{};
}
const
ConnectCtx
=
struct
{
client
:
*
Http2Client
,
req
:
uv
.
connect_t
};
fn
ensureConnected
(
self
:
*
Http2Client
)
!
void
{
if
(
self
.
closing
)
return
error
.
ConnectionFailed
;
if
(
self
.
connected
>
0
)
return
;
if
(
self
.
connected
<
0
)
return
error
.
ConnectionFailed
;
var
conn_start
:
u64
=
@intCast
(
std
.
time
.
nanoTimestamp
());
if
(
!
self
.
connect_pending
)
{
const
ctx
=
try
self
.
allocator
.
create
(
ConnectCtx
);
ctx
.
*
=
.{
.
client
=
self
,
.
req
=
.{}
};
ctx
.
req
.
data
=
ctx
;
if
(
tlsuv
.
tlsuv_stream_connect
(
&
ctx
.
req
,
&
self
.
tls
,
self
.
host
.
ptr
,
if
(
self
.
use_tls
)
443
else
80
,
onConnect
)
!=
0
)
{
self
.
allocator
.
destroy
(
ctx
);
return
error
.
ConnectionFailed
;
}
self
.
connect_pending
=
true
;
}
var
loop_count
:
u32
=
0
;
while
(
self
.
connected
==
0
)
{
_
=
uv
.
uv_run
(
self
.
loop
,
uv
.
RUN_ONCE
);
loop_count
+=
1
;
}
conn_start
=
debug
.
timer
(
" h2: tls connect"
,
conn_start
);
debug
.
log
(
" h2: connect loop iterations={d}"
,
.{
loop_count
});
if
(
self
.
connected
<
0
)
return
error
.
ConnectionFailed
;
}
pub
fn
initiateConnectAsync
(
self
:
*
Http2Client
)
!
void
{
if
(
self
.
closing
)
return
error
.
ConnectionFailed
;
if
(
self
.
connected
>
0
)
return
;
if
(
self
.
connected
<
0
)
return
error
.
ConnectionFailed
;
if
(
self
.
connect_pending
)
return
;
const
ctx
=
try
self
.
allocator
.
create
(
ConnectCtx
);
ctx
.
*
=
.{
.
client
=
self
,
.
req
=
.{}
};
ctx
.
req
.
data
=
ctx
;
if
(
tlsuv
.
tlsuv_stream_connect
(
&
ctx
.
req
,
&
self
.
tls
,
self
.
host
.
ptr
,
if
(
self
.
use_tls
)
443
else
80
,
onConnect
)
!=
0
)
{
self
.
allocator
.
destroy
(
ctx
);
return
error
.
ConnectionFailed
;
}
self
.
connect_pending
=
true
;
}
fn
makeNv
(
name
:
[
:
0
]
const
u8
,
value
:
[
:
0
]
const
u8
)
nghttp2
.
nv
{
return
.{
.
name
=
@constCast
(
name
.
ptr
),
.
value
=
@constCast
(
value
.
ptr
),
.
namelen
=
name
.
len
,
.
valuelen
=
value
.
len
,
.
flags
=
nghttp2
.
NV_FLAG_NONE
};
}
pub
fn
get
(
self
:
*
Http2Client
,
path
:
[]
const
u8
,
allocator
:
std
.
mem
.
Allocator
)
!
[]
u8
{
return
self
.
getWithAccept
(
path
,
"application/json"
,
allocator
);
}
pub
fn
getWithAccept
(
self
:
*
Http2Client
,
path
:
[]
const
u8
,
accept
:
[
:
0
]
const
u8
,
allocator
:
std
.
mem
.
Allocator
)
!
[]
u8
{
try
self
.
ensureConnected
();
if
(
self
.
request_count
>=
MAX_PENDING_REQUESTS
)
self
.
resetRequests
();
const
req
=
&
self
.
requests
[
self
.
request_count
];
self
.
request_count
+=
1
;
req
.
*
=
.{
.
stream_id
=
0
,
.
path
=
try
self
.
allocator
.
dupeZ
(
u8
,
path
),
.
on_data
=
null
,
.
on_complete
=
null
,
.
on_error
=
null
,
.
userdata
=
null
,
.
response_body
=
.{},
.
status_code
=
0
,
.
done
=
false
,
.
has_error
=
false
,
.
start_ns
=
@intCast
(
std
.
time
.
nanoTimestamp
()),
.
end_ns
=
0
,
.
bytes
=
0
,
.
content_encoding
=
.
identity
,
};
const
session
=
self
.
h2_session
orelse
return
error
.
Http2Error
;
var
hdrs
=
[
_
]
nghttp2
.
nv
{
makeNv
(
":method"
,
"GET"
),
makeNv
(
":path"
,
req
.
path
.
?
),
makeNv
(
":scheme"
,
"https"
),
makeNv
(
":authority"
,
self
.
host
),
makeNv
(
"accept"
,
accept
),
makeNv
(
"user-agent"
,
user_agent
)
};
const
sid
=
nghttp2
.
nghttp2_submit_request
(
session
,
null
,
&
hdrs
,
hdrs
.
len
,
null
,
req
);
if
(
sid
<
0
)
{
self
.
request_count
-=
1
;
if
(
req
.
path
)
|
p
|
self
.
allocator
.
free
(
p
);
return
error
.
Http2Error
;
}
req
.
stream_id
=
sid
;
try
self
.
flush
();
while
(
!
req
.
done
)
{
_
=
uv
.
uv_run
(
self
.
loop
,
uv
.
RUN_ONCE
);
try
self
.
flush
();
}
self
.
last_response_status_code
=
req
.
status_code
;
if
(
req
.
has_error
or
req
.
status_code
!=
200
)
return
error
.
ResponseError
;
return
try
allocator
.
dupe
(
u8
,
req
.
response_body
.
items
);
}
pub
fn
getStream
(
self
:
*
Http2Client
,
path
:
[]
const
u8
,
on_data
:
*
const
fn
([]
const
u8
,
?*
anyopaque
)
void
,
on_complete
:
*
const
fn
(
u16
,
?*
anyopaque
)
void
,
on_error
:
*
const
fn
(
FetchError
,
?*
anyopaque
)
void
,
userdata
:
?*
anyopaque
)
!
void
{
try
self
.
ensureConnected
();
const
req
=
self
.
findOrAllocSlot
()
orelse
return
error
.
OutOfMemory
;
req
.
*
=
.{
.
stream_id
=
0
,
.
path
=
try
self
.
allocator
.
dupeZ
(
u8
,
path
),
.
on_data
=
on_data
,
.
on_complete
=
on_complete
,
.
on_error
=
on_error
,
.
userdata
=
userdata
,
.
response_body
=
.{},
.
status_code
=
0
,
.
done
=
false
,
.
has_error
=
false
,
.
start_ns
=
@intCast
(
std
.
time
.
nanoTimestamp
()),
.
end_ns
=
0
,
.
bytes
=
0
,
.
content_encoding
=
.
identity
};
const
session
=
self
.
h2_session
orelse
return
error
.
Http2Error
;
var
hdrs
=
[
_
]
nghttp2
.
nv
{
makeNv
(
":method"
,
"GET"
),
makeNv
(
":path"
,
req
.
path
.
?
),
makeNv
(
":scheme"
,
"https"
),
makeNv
(
":authority"
,
self
.
host
),
makeNv
(
"accept"
,
"*/*"
),
makeNv
(
"user-agent"
,
user_agent
)
};
const
sid
=
nghttp2
.
nghttp2_submit_request
(
session
,
null
,
&
hdrs
,
hdrs
.
len
,
null
,
req
);
if
(
sid
<
0
)
{
if
(
req
.
path
)
|
p
|
self
.
allocator
.
free
(
p
);
req
.
stream_id
=
-
1
;
return
error
.
Http2Error
;
}
req
.
stream_id
=
sid
;
try
self
.
flush
();
}
pub
fn
run
(
self
:
*
Http2Client
)
!
void
{
const
run_start
:
u64
=
@intCast
(
std
.
time
.
nanoTimestamp
());
var
loop_count
:
u32
=
0
;
var
last_done
:
usize
=
0
;
var
last_report
:
u64
=
run_start
;
while
(
self
.
requests_done
<
self
.
request_count
)
{
if
(
uv
.
uv_run
(
self
.
loop
,
uv
.
RUN_ONCE
)
==
0
)
break
;
try
self
.
flush
();
loop_count
+=
1
;
const
now
:
u64
=
@intCast
(
std
.
time
.
nanoTimestamp
());
if
(
now
-
last_report
>
1
_000_000_000
)
{
const
done_delta
=
self
.
requests_done
-
last_done
;
debug
.
log
(
" h2: progress {d}/{d} (+{d} in last 1s) loops={d}"
,
.{
self
.
requests_done
,
self
.
request_count
,
done_delta
,
loop_count
,
});
last_done
=
self
.
requests_done
;
last_report
=
now
;
}
}
const
elapsed_ns
:
u64
=
@intCast
(
@as
(
i128
,
@intCast
(
std
.
time
.
nanoTimestamp
()))
-
@as
(
i128
,
run_start
));
const
elapsed_ms
=
elapsed_ns
/
1
_000_000
;
debug
.
log
(
" h2: run complete in {d}ms, {d} loops, {d}/{d} done"
,
.{
elapsed_ms
,
loop_count
,
self
.
requests_done
,
self
.
request_count
,
});
var
error_count
:
usize
=
0
;
for
(
self
.
requests
[
0
..
self
.
request_count
])
|
req
|
{
if
(
req
.
has_error
)
error_count
+=
1
;
}
if
(
error_count
>
0
)
{
debug
.
log
(
" h2: {d} requests had errors"
,
.{
error_count
});
return
error
.
ResponseError
;
}
}
};
pub
const
TarballCtx
=
struct
{
handler
:
StreamHandler
,
done
:
bool
,
has_error
:
bool
,
url
:
[]
const
u8
,
start_ns
:
u64
,
bytes
:
usize
,
};
const
TarballStats
=
struct
{
url
:
[]
const
u8
,
bytes
:
usize
,
elapsed_ms
:
u64
,
};
const
TarballCallbacks
=
struct
{
fn
onData
(
data
:
[]
const
u8
,
ud
:
?*
anyopaque
)
void
{
const
ctx
:
*
TarballCtx
=
@ptrCast
(
@alignCast
(
ud
));
ctx
.
bytes
+=
data
.
len
;
ctx
.
handler
.
on_data
(
data
,
ctx
.
handler
.
user_data
);
}
fn
onComplete
(
status
:
u16
,
ud
:
?*
anyopaque
)
void
{
const
ctx
:
*
TarballCtx
=
@ptrCast
(
@alignCast
(
ud
));
ctx
.
handler
.
on_complete
(
status
,
ctx
.
handler
.
user_data
);
ctx
.
done
=
true
;
}
fn
onError
(
err
:
FetchError
,
ud
:
?*
anyopaque
)
void
{
const
ctx
:
*
TarballCtx
=
@ptrCast
(
@alignCast
(
ud
));
ctx
.
handler
.
on_error
(
err
,
ctx
.
handler
.
user_data
);
ctx
.
done
=
true
;
ctx
.
has_error
=
true
;
}
};
pub
const
Fetcher
=
struct
{
allocator
:
std
.
mem
.
Allocator
,
registry_host
:
[]
const
u8
,
meta_clients
:
[
NUM_META_CONNECTIONS
]
?*
Http2Client
,
meta_clients_initialized
:
bool
,
pending
:
std
.
ArrayListUnmanaged
(
PendingRequest
),
tarball_clients
:
[
NUM_CONNECTIONS
]
?*
Http2Client
,
tarball_clients_initialized
:
bool
,
tarball_contexts
:
std
.
ArrayListUnmanaged
(
*
TarballCtx
),
tarball_round_robin
:
usize
,
tarball_stats
:
std
.
ArrayListUnmanaged
(
TarballStats
),
last_http_error_url
:
?
[]
u8
,
last_http_error_status
:
u16
,
pub
const
HttpErrorInfo
=
struct
{
url
:
[]
const
u8
,
status
:
u16
,
};
pub
fn
init
(
allocator
:
std
.
mem
.
Allocator
,
registry_host
:
[]
const
u8
)
!*
Fetcher
{
const
f
=
try
allocator
.
create
(
Fetcher
);
f
.
*
=
.{
.
allocator
=
allocator
,
.
registry_host
=
try
allocator
.
dupe
(
u8
,
registry_host
),
.
meta_clients
=
[
_
]
?*
Http2Client
{
null
}
**
NUM_META_CONNECTIONS
,
.
meta_clients_initialized
=
false
,
.
pending
=
.{},
.
tarball_clients
=
[
_
]
?*
Http2Client
{
null
}
**
NUM_CONNECTIONS
,
.
tarball_clients_initialized
=
false
,
.
tarball_contexts
=
.{},
.
tarball_round_robin
=
0
,
.
tarball_stats
=
.{},
.
last_http_error_url
=
null
,
.
last_http_error_status
=
0
,
};
return
f
;
}
pub
fn
deinit
(
self
:
*
Fetcher
)
void
{
if
(
self
.
last_http_error_url
)
|
url
|
self
.
allocator
.
free
(
url
);
for
(
&
self
.
meta_clients
)
|*
maybe_client
|
{
if
(
maybe_client
.
*
)
|
c
|
{
c
.
deinit
();
maybe_client
.
*
=
null
;
}
}
for
(
self
.
pending
.
items
)
|
req
|
self
.
allocator
.
free
(
req
.
url
);
self
.
pending
.
deinit
(
self
.
allocator
);
for
(
&
self
.
tarball_clients
)
|*
maybe_client
|
{
if
(
maybe_client
.
*
)
|
c
|
{
c
.
deinit
();
maybe_client
.
*
=
null
;
}
}
for
(
self
.
tarball_contexts
.
items
)
|
ctx
|
{
self
.
allocator
.
free
(
ctx
.
url
);
self
.
allocator
.
destroy
(
ctx
);
}
self
.
tarball_contexts
.
deinit
(
self
.
allocator
);
for
(
self
.
tarball_stats
.
items
)
|
stat
|
self
.
allocator
.
free
(
stat
.
url
);
self
.
tarball_stats
.
deinit
(
self
.
allocator
);
self
.
allocator
.
free
(
self
.
registry_host
);
self
.
allocator
.
destroy
(
self
);
}
fn
clearLastHttpError
(
self
:
*
Fetcher
)
void
{
if
(
self
.
last_http_error_url
)
|
url
|
self
.
allocator
.
free
(
url
);
self
.
last_http_error_url
=
null
;
self
.
last_http_error_status
=
0
;
}
fn
setLastHttpError
(
self
:
*
Fetcher
,
url
:
[]
const
u8
,
status
:
u16
)
void
{
self
.
clearLastHttpError
();
self
.
last_http_error_url
=
self
.
allocator
.
dupe
(
u8
,
url
)
catch
null
;
self
.
last_http_error_status
=
status
;
}
pub
fn
getLastHttpError
(
self
:
*
const
Fetcher
)
?
HttpErrorInfo
{
const
url
=
self
.
last_http_error_url
orelse
return
null
;
return
.{
.
url
=
url
,
.
status
=
self
.
last_http_error_status
};
}
fn
ensureMetaClients
(
self
:
*
Fetcher
)
!
void
{
if
(
self
.
meta_clients_initialized
)
return
;
for
(
&
self
.
meta_clients
,
0
..)
|*
slot
,
i
|
{
const
client
=
Http2Client
.
init
(
self
.
allocator
,
self
.
registry_host
,
true
)
catch
|
err
|
{
debug
.
log
(
"fetcher: failed to init meta connection {d}: {}"
,
.{
i
,
err
});
continue
;
};
client
.
ensureConnected
()
catch
|
err
|
{
debug
.
log
(
"fetcher: failed to connect meta {d}: {}"
,
.{
i
,
err
});
client
.
deinit
();
continue
;
};
slot
.
*
=
client
;
}
var
any_connected
=
false
;
for
(
self
.
meta_clients
)
|
slot
|
{
if
(
slot
!=
null
)
{
any_connected
=
true
;
break
;
}
}
if
(
!
any_connected
)
return
error
.
ConnectionFailed
;
self
.
meta_clients_initialized
=
true
;
}
pub
fn
resetMetaClients
(
self
:
*
Fetcher
)
void
{
self
.
clearLastHttpError
();
for
(
&
self
.
meta_clients
)
|*
slot
|
{
if
(
slot
.
*
)
|
client
|
{
client
.
deinit
();
slot
.
*
=
null
;
}
}
self
.
meta_clients_initialized
=
false
;
}
fn
ensureTarballClients
(
self
:
*
Fetcher
)
!
void
{
if
(
self
.
tarball_clients_initialized
)
return
;
debug
.
log
(
"fetcher: initializing {d} persistent connections"
,
.{
NUM_CONNECTIONS
});
const
init_start
:
u64
=
@intCast
(
std
.
time
.
nanoTimestamp
());
for
(
&
self
.
tarball_clients
,
0
..)
|*
slot
,
i
|
{
const
client
=
Http2Client
.
init
(
self
.
allocator
,
self
.
registry_host
,
true
)
catch
|
err
|
{
debug
.
log
(
"fetcher: failed to init connection {d}: {}"
,
.{
i
,
err
});
continue
;
};
client
.
ensureConnected
()
catch
|
err
|
{
debug
.
log
(
"fetcher: failed to connect {d}: {}"
,
.{
i
,
err
});
client
.
deinit
();
continue
;
};
slot
.
*
=
client
;
}
var
any_connected
=
false
;
for
(
self
.
tarball_clients
)
|
slot
|
{
if
(
slot
!=
null
)
{
any_connected
=
true
;
break
;
}
}
if
(
!
any_connected
)
return
error
.
ConnectionFailed
;
self
.
tarball_clients_initialized
=
true
;
_
=
debug
.
timer
(
"fetcher: connection pool init"
,
init_start
);
}
fn
findAvailableClient
(
self
:
*
Fetcher
)
?
struct
{
client
:
*
Http2Client
,
idx
:
usize
}
{
var
attempts
:
usize
=
0
;
while
(
attempts
<
NUM_CONNECTIONS
)
:
(
attempts
+=
1
)
{
const
idx
=
(
self
.
tarball_round_robin
+
attempts
)
%
NUM_CONNECTIONS
;
if
(
self
.
tarball_clients
[
idx
])
|
client
|
{
if
(
client
.
hasCapacity
())
return
.{
.
client
=
client
,
.
idx
=
idx
};
}
}
return
null
;
}
pub
fn
initiateTarballConnectionsAsync
(
self
:
*
Fetcher
)
void
{
if
(
self
.
tarball_clients_initialized
)
return
;
debug
.
log
(
"fetcher: initiating {d} tarball connections (async)"
,
.{
NUM_CONNECTIONS
});
for
(
&
self
.
tarball_clients
,
0
..)
|*
slot
,
i
|
{
const
client
=
Http2Client
.
init
(
self
.
allocator
,
self
.
registry_host
,
true
)
catch
{
continue
;
};
client
.
initiateConnectAsync
()
catch
{
client
.
deinit
();
continue
;
};
slot
.
*
=
client
;
_
=
i
;
}
var
any_connected
=
false
;
for
(
self
.
tarball_clients
)
|
slot
|
{
if
(
slot
!=
null
)
{
any_connected
=
true
;
break
;
}
}
if
(
any_connected
)
self
.
tarball_clients_initialized
=
true
;
}
pub
fn
queueTarballAsync
(
self
:
*
Fetcher
,
url
:
[]
const
u8
,
handler
:
StreamHandler
)
!
void
{
try
self
.
ensureTarballClients
();
const
parsed
=
try
ParsedUrl
.
parse
(
url
);
const
available
=
self
.
findAvailableClient
()
orelse
{
try
self
.
pending
.
append
(
self
.
allocator
,
.{
.
url
=
try
self
.
allocator
.
dupe
(
u8
,
url
),
.
handler
=
handler
,
});
return
;
};
const
ctx
=
try
self
.
allocator
.
create
(
TarballCtx
);
ctx
.
*
=
.{
.
handler
=
handler
,
.
done
=
false
,
.
has_error
=
false
,
.
url
=
try
self
.
allocator
.
dupe
(
u8
,
url
),
.
start_ns
=
@intCast
(
std
.
time
.
nanoTimestamp
()),
.
bytes
=
0
,
};
try
self
.
tarball_contexts
.
append
(
self
.
allocator
,
ctx
);
try
available
.
client
.
getStream
(
parsed
.
path
,
TarballCallbacks
.
onData
,
TarballCallbacks
.
onComplete
,
TarballCallbacks
.
onError
,
ctx
,
);
self
.
tarball_round_robin
=
(
available
.
idx
+
1
)
%
NUM_CONNECTIONS
;
}
pub
fn
tick
(
self
:
*
Fetcher
)
usize
{
self
.
ensureTarballClients
()
catch
return
0
;
const
loop
=
uv
.
uv_default_loop
();
for
(
&
self
.
tarball_clients
)
|
maybe_client
|
{
if
(
maybe_client
)
|
c
|
c
.
flush
()
catch
{};
}
_
=
uv
.
uv_run
(
loop
,
uv
.
RUN_NOWAIT
);
for
(
&
self
.
tarball_clients
)
|
maybe_client
|
{
if
(
maybe_client
)
|
c
|
c
.
recycleCompletedRequests
();
}
const
completed
=
self
.
cleanupCompletedContexts
();
self
.
dispatchPending
();
return
completed
;
}
fn
cleanupCompletedContexts
(
self
:
*
Fetcher
)
usize
{
var
completed
:
usize
=
0
;
var
i
:
usize
=
0
;
while
(
i
<
self
.
tarball_contexts
.
items
.
len
)
{
const
ctx
=
self
.
tarball_contexts
.
items
[
i
];
if
(
ctx
.
done
)
{
completed
+=
1
;
self
.
allocator
.
free
(
ctx
.
url
);
self
.
allocator
.
destroy
(
ctx
);
_
=
self
.
tarball_contexts
.
swapRemove
(
i
);
}
else
i
+=
1
;
}
return
completed
;
}
fn
dispatchPending
(
self
:
*
Fetcher
)
void
{
while
(
self
.
pending
.
items
.
len
>
0
)
{
const
available
=
self
.
findAvailableClient
()
orelse
break
;
const
req
=
self
.
pending
.
pop
()
orelse
break
;
const
handler
=
req
.
handler
orelse
{
self
.
allocator
.
free
(
req
.
url
);
continue
;
};
self
.
dispatchRequest
(
available
.
client
,
req
.
url
,
handler
)
catch
|
err
|
{
handler
.
on_error
(
errToFetchError
(
err
),
handler
.
user_data
);
self
.
allocator
.
free
(
req
.
url
);
continue
;
};
}
}
fn
dispatchRequest
(
self
:
*
Fetcher
,
client
:
*
Http2Client
,
url
:
[]
const
u8
,
handler
:
StreamHandler
)
!
void
{
const
parsed
=
try
ParsedUrl
.
parse
(
url
);
const
ctx
=
try
self
.
allocator
.
create
(
TarballCtx
);
ctx
.
*
=
.{
.
handler
=
handler
,
.
done
=
false
,
.
has_error
=
false
,
.
url
=
url
,
.
start_ns
=
@intCast
(
std
.
time
.
nanoTimestamp
()),
.
bytes
=
0
,
};
errdefer
self
.
allocator
.
destroy
(
ctx
);
try
self
.
tarball_contexts
.
append
(
self
.
allocator
,
ctx
);
try
client
.
getStream
(
parsed
.
path
,
TarballCallbacks
.
onData
,
TarballCallbacks
.
onComplete
,
TarballCallbacks
.
onError
,
ctx
,
);
}
fn
errToFetchError
(
err
:
anyerror
)
FetchError
{
return
switch
(
err
)
{
error
.
InvalidUrl
=>
FetchError
.
InvalidUrl
,
error
.
OutOfMemory
=>
FetchError
.
OutOfMemory
,
else
=>
FetchError
.
Http2Error
,
};
}
pub
fn
pendingTarballCount
(
self
:
*
Fetcher
)
usize
{
return
self
.
tarball_contexts
.
items
.
len
;
}
pub
fn
finishTarballs
(
self
:
*
Fetcher
)
void
{
const
loop
=
uv
.
uv_default_loop
();
var
last_report
:
u64
=
@intCast
(
std
.
time
.
nanoTimestamp
());
var
loops
:
usize
=
0
;
var
completed
:
usize
=
0
;
const
start
=
last_report
;
while
(
self
.
tarball_contexts
.
items
.
len
>
0
or
self
.
pending
.
items
.
len
>
0
)
{
for
(
&
self
.
tarball_clients
)
|
maybe_client
|
{
if
(
maybe_client
)
|
c
|
c
.
flush
()
catch
{};
}
if
(
uv
.
uv_run
(
loop
,
uv
.
RUN_ONCE
)
==
0
and
self
.
pending
.
items
.
len
==
0
and
self
.
tarball_contexts
.
items
.
len
==
0
)
break
;
loops
+=
1
;
for
(
&
self
.
tarball_clients
)
|
maybe_client
|
{
if
(
maybe_client
)
|
c
|
c
.
recycleCompletedRequests
();
}
var
i
:
usize
=
0
;
while
(
i
<
self
.
tarball_contexts
.
items
.
len
)
{
const
ctx
=
self
.
tarball_contexts
.
items
[
i
];
if
(
ctx
.
done
)
{
if
(
!
ctx
.
has_error
)
{
const
elapsed_ms
:
u64
=
@intCast
((
@as
(
u64
,
@intCast
(
std
.
time
.
nanoTimestamp
()))
-
ctx
.
start_ns
)
/
1
_000_000
);
const
url_copy
=
self
.
allocator
.
dupe
(
u8
,
ctx
.
url
)
catch
null
;
if
(
url_copy
)
|
url
|
{
self
.
tarball_stats
.
append
(
self
.
allocator
,
.{
.
url
=
url
,
.
bytes
=
ctx
.
bytes
,
.
elapsed_ms
=
elapsed_ms
})
catch
{};
}
}
self
.
allocator
.
free
(
ctx
.
url
);
self
.
allocator
.
destroy
(
ctx
);
_
=
self
.
tarball_contexts
.
swapRemove
(
i
);
completed
+=
1
;
}
else
{
i
+=
1
;
}
}
while
(
self
.
pending
.
items
.
len
>
0
)
{
var
queued
=
false
;
for
(
&
self
.
tarball_clients
,
0
..)
|
maybe_client
,
conn_idx
|
{
if
(
maybe_client
)
|
client
|
{
if
(
client
.
hasCapacity
())
{
const
maybe_req
=
self
.
pending
.
pop
();
const
req
=
maybe_req
orelse
break
;
if
(
req
.
handler
)
|
handler
|
{
const
parsed
=
ParsedUrl
.
parse
(
req
.
url
)
catch
{
handler
.
on_error
(
FetchError
.
InvalidUrl
,
handler
.
user_data
);
self
.
allocator
.
free
(
req
.
url
);
continue
;
};
const
ctx
=
self
.
allocator
.
create
(
TarballCtx
)
catch
{
handler
.
on_error
(
FetchError
.
OutOfMemory
,
handler
.
user_data
);
self
.
allocator
.
free
(
req
.
url
);
continue
;
};
ctx
.
*
=
.{
.
handler
=
handler
,
.
done
=
false
,
.
has_error
=
false
,
.
url
=
req
.
url
,
.
start_ns
=
@intCast
(
std
.
time
.
nanoTimestamp
()),
.
bytes
=
0
,
};
self
.
tarball_contexts
.
append
(
self
.
allocator
,
ctx
)
catch
{
self
.
allocator
.
destroy
(
ctx
);
self
.
allocator
.
free
(
req
.
url
);
continue
;
};
client
.
getStream
(
parsed
.
path
,
struct
{
fn
onData
(
data
:
[]
const
u8
,
ud
:
?*
anyopaque
)
void
{
const
c
:
*
TarballCtx
=
@ptrCast
(
@alignCast
(
ud
));
c
.
bytes
+=
data
.
len
;
c
.
handler
.
on_data
(
data
,
c
.
handler
.
user_data
);
}
}.
onData
,
struct
{
fn
onComplete
(
status
:
u16
,
ud
:
?*
anyopaque
)
void
{
const
c
:
*
TarballCtx
=
@ptrCast
(
@alignCast
(
ud
));
c
.
handler
.
on_complete
(
status
,
c
.
handler
.
user_data
);
if
(
debug
.
enabled
)
{
const
elapsed_ms
:
u64
=
@intCast
((
@as
(
u64
,
@intCast
(
std
.
time
.
nanoTimestamp
()))
-
c
.
start_ns
)
/
1
_000_000
);
debug
.
log
(
" tarball: done {s} {d}ms {d} bytes status={d}"
,
.{
c
.
url
,
elapsed_ms
,
c
.
bytes
,
status
});
}
c
.
done
=
true
;
}
}.
onComplete
,
struct
{
fn
onError
(
err
:
FetchError
,
ud
:
?*
anyopaque
)
void
{
const
c
:
*
TarballCtx
=
@ptrCast
(
@alignCast
(
ud
));
c
.
handler
.
on_error
(
err
,
c
.
handler
.
user_data
);
if
(
debug
.
enabled
)
{
const
elapsed_ms
:
u64
=
@intCast
((
@as
(
u64
,
@intCast
(
std
.
time
.
nanoTimestamp
()))
-
c
.
start_ns
)
/
1
_000_000
);
debug
.
log
(
" tarball: error {s} {d}ms {d} bytes"
,
.{
c
.
url
,
elapsed_ms
,
c
.
bytes
});
}
c
.
done
=
true
;
c
.
has_error
=
true
;
}
}.
onError
,
ctx
,
)
catch
{
handler
.
on_error
(
FetchError
.
Http2Error
,
handler
.
user_data
);
ctx
.
done
=
true
;
};
queued
=
true
;
_
=
conn_idx
;
}
else
{
self
.
allocator
.
free
(
req
.
url
);
}
break
;
}
}
}
if
(
!
queued
)
break
;
}
const
now
:
u64
=
@intCast
(
std
.
time
.
nanoTimestamp
());
if
(
now
-
last_report
>
1
_000_000_000
)
{
var
total_bytes
:
usize
=
0
;
for
(
self
.
tarball_contexts
.
items
)
|
ctx
|
{
total_bytes
+=
ctx
.
bytes
;
}
debug
.
log
(
" h2: {d} in-flight, {d} pending, {d} completed, {d} loops"
,
.{
self
.
tarball_contexts
.
items
.
len
,
self
.
pending
.
items
.
len
,
completed
,
loops
,
});
debug
.
log
(
" h2: tarball progress in-flight bytes={d}"
,
.{
total_bytes
});
last_report
=
now
;
}
}
const
elapsed_ns
:
u64
=
@intCast
(
@as
(
i128
,
@intCast
(
std
.
time
.
nanoTimestamp
()))
-
@as
(
i128
,
start
));
debug
.
log
(
"fetcher: finishTarballs completed in {d}ms, {d} loops, {d} completed"
,
.{
elapsed_ns
/
1
_000_000
,
loops
,
completed
,
});
if
(
debug
.
enabled
and
self
.
tarball_stats
.
items
.
len
>
0
)
{
var
top_time
:
[
5
]
?
TarballStats
=
.{
null
}
**
5
;
var
top_size
:
[
5
]
?
TarballStats
=
.{
null
}
**
5
;
for
(
self
.
tarball_stats
.
items
)
|
stat
|
{
var
idx_time
:
usize
=
top_time
.
len
;
for
(
top_time
,
0
..)
|
slot
,
i
|
{
if
(
slot
==
null
or
stat
.
elapsed_ms
>
slot
.
?
.
elapsed_ms
)
{
idx_time
=
i
;
break
;
}
}
if
(
idx_time
<
top_time
.
len
)
{
var
carry
=
stat
;
var
j
=
idx_time
;
while
(
j
<
top_time
.
len
)
:
(
j
+=
1
)
{
const
next
=
top_time
[
j
];
top_time
[
j
]
=
carry
;
if
(
next
)
|
n
|
{
carry
=
n
;
}
else
{
break
;
}
}
}
var
idx_size
:
usize
=
top_size
.
len
;
for
(
top_size
,
0
..)
|
slot
,
i
|
{
if
(
slot
==
null
or
stat
.
bytes
>
slot
.
?
.
bytes
)
{
idx_size
=
i
;
break
;
}
}
if
(
idx_size
<
top_size
.
len
)
{
var
carry_size
=
stat
;
var
k
=
idx_size
;
while
(
k
<
top_size
.
len
)
:
(
k
+=
1
)
{
const
next_size
=
top_size
[
k
];
top_size
[
k
]
=
carry_size
;
if
(
next_size
)
|
n
|
{
carry_size
=
n
;
}
else
{
break
;
}
}
}
}
debug
.
log
(
"fetcher: top tarballs by time"
,
.{});
for
(
top_time
,
0
..)
|
maybe_stat
,
i
|
{
if
(
maybe_stat
)
|
stat
|
{
debug
.
log
(
" {d}. {s} {d}ms {d} bytes"
,
.{
i
+
1
,
stat
.
url
,
stat
.
elapsed_ms
,
stat
.
bytes
});
}
}
debug
.
log
(
"fetcher: top tarballs by size"
,
.{});
for
(
top_size
,
0
..)
|
maybe_stat
,
i
|
{
if
(
maybe_stat
)
|
stat
|
{
debug
.
log
(
" {d}. {s} {d} bytes {d}ms"
,
.{
i
+
1
,
stat
.
url
,
stat
.
bytes
,
stat
.
elapsed_ms
});
}
}
}
}
pub
fn
fetchMetadata
(
self
:
*
Fetcher
,
package_name
:
[]
const
u8
,
allocator
:
std
.
mem
.
Allocator
)
!
[]
u8
{
return
self
.
fetchMetadataFull
(
package_name
,
false
,
allocator
);
}
const
DecodedMetadata
=
struct
{
data
:
[]
u8
,
compressed
:
bool
,
};
fn
metaClientCanQueue
(
c
:
*
const
Http2Client
)
bool
{
return
c
.
h2_session
!=
null
and
c
.
connected
==
1
and
c
.
request_count
<
MAX_PENDING_REQUESTS
-
1
;
}
fn
nextMetaClient
(
self
:
*
Fetcher
,
conn_idx
:
*
usize
)
?*
Http2Client
{
var
attempts
:
usize
=
0
;
while
(
attempts
<
NUM_META_CONNECTIONS
)
:
(
attempts
+=
1
)
{
if
(
self
.
meta_clients
[
conn_idx
.
*
])
|
c
|
{
if
(
metaClientCanQueue
(
c
))
return
c
;
}
conn_idx
.
*
=
(
conn_idx
.
*
+
1
)
%
NUM_META_CONNECTIONS
;
}
return
null
;
}
fn
flushMetaClients
(
self
:
*
Fetcher
)
void
{
for
(
self
.
meta_clients
)
|
maybe_client
|
{
if
(
maybe_client
)
|
c
|
c
.
flush
()
catch
{};
}
}
fn
metaRequestsComplete
(
self
:
*
Fetcher
)
bool
{
for
(
self
.
meta_clients
)
|
maybe_client
|
{
if
(
maybe_client
)
|
c
|
{
for
(
c
.
requests
[
0
..
c
.
request_count
])
|*
req
|
{
if
(
!
req
.
done
and
!
req
.
has_error
)
return
false
;
}
}
}
return
true
;
}
fn
decodeMetadataOwned
(
req
:
*
Http2Client
.
RequestState
,
allocator
:
std
.
mem
.
Allocator
,
decompress_buf
:
*
std
.
ArrayListUnmanaged
(
u8
),
)
?
DecodedMetadata
{
if
(
req
.
has_error
or
req
.
status_code
!=
200
)
return
null
;
if
(
req
.
content_encoding
!=
.
gzip
)
{
const
data
=
allocator
.
dupe
(
u8
,
req
.
response_body
.
items
)
catch
return
null
;
return
.{
.
data
=
data
,
.
compressed
=
false
};
}
decompress_buf
.
clearRetainingCapacity
();
const
decomp
=
extractor
.
GzipDecompressor
.
init
(
allocator
)
catch
return
null
;
defer
decomp
.
deinit
();
_
=
decomp
.
decompress
(
req
.
response_body
.
items
,
struct
{
fn
onChunk
(
data
:
[]
const
u8
,
ctx
:
?*
anyopaque
)
anyerror
!
void
{
const
buf
:
*
std
.
ArrayListUnmanaged
(
u8
)
=
@ptrCast
(
@alignCast
(
ctx
));
try
buf
.
appendSlice
(
c_allocator
,
data
);
}
}.
onChunk
,
decompress_buf
)
catch
return
null
;
const
data
=
allocator
.
dupe
(
u8
,
decompress_buf
.
items
)
catch
return
null
;
return
.{
.
data
=
data
,
.
compressed
=
true
};
}
pub
fn
fetchMetadataFull
(
self
:
*
Fetcher
,
package_name
:
[]
const
u8
,
full
:
bool
,
allocator
:
std
.
mem
.
Allocator
)
!
[]
u8
{
try
self
.
ensureMetaClients
();
self
.
clearLastHttpError
();
for
(
self
.
meta_clients
)
|
maybe_client
|
{
if
(
maybe_client
)
|
client
|
{
var
path_buf
:
[
512
]
u8
=
undefined
;
const
path_slice
=
std
.
fmt
.
bufPrint
(
&
path_buf
,
"/{s}"
,
.{
package_name
})
catch
return
error
.
OutOfMemory
;
const
accept
:
[
:
0
]
const
u8
=
if
(
full
)
"application/json"
else
"application/vnd.npm.install-v1+json"
;
return
client
.
getWithAccept
(
path_slice
,
accept
,
allocator
)
catch
|
err
|
{
if
(
err
==
error
.
ResponseError
)
{
var
url_buf
:
[
1024
]
u8
=
undefined
;
const
url
=
std
.
fmt
.
bufPrint
(
&
url_buf
,
"https://{s}/{s}"
,
.{
self
.
registry_host
,
package_name
})
catch
""
;
self
.
setLastHttpError
(
url
,
client
.
last_response_status_code
);
}
return
err
;
};
}
}
return
error
.
ConnectionFailed
;
}
pub
const
MetadataResult
=
struct
{
name
:
[]
const
u8
,
data
:
?
[]
u8
,
compressed
:
bool
,
has_error
:
bool
,
};
fn
storeMetadataBatchResult
(
req
:
*
Http2Client
.
RequestState
,
result
:
*
MetadataResult
,
allocator
:
std
.
mem
.
Allocator
,
decompress_buf
:
*
std
.
ArrayListUnmanaged
(
u8
),
)
bool
{
const
decoded
=
decodeMetadataOwned
(
req
,
allocator
,
decompress_buf
)
orelse
{
result
.
has_error
=
true
;
return
false
;
};
result
.
data
=
decoded
.
data
;
result
.
compressed
=
decoded
.
compressed
;
return
true
;
}
pub
fn
fetchMetadataBatch
(
self
:
*
Fetcher
,
names
:
[]
const
[]
const
u8
,
allocator
:
std
.
mem
.
Allocator
)
!
[]
MetadataResult
{
if
(
names
.
len
==
0
)
return
&
[
_
]
MetadataResult
{};
var
total_start
:
u64
=
@intCast
(
std
.
time
.
nanoTimestamp
());
try
self
.
ensureMetaClients
();
total_start
=
debug
.
timer
(
" meta: get clients"
,
total_start
);
var
active_connections
:
usize
=
0
;
for
(
self
.
meta_clients
)
|
maybe_client
|
{
if
(
maybe_client
!=
null
)
active_connections
+=
1
;
}
if
(
active_connections
==
0
)
return
error
.
ConnectionFailed
;
debug
.
log
(
" meta: batch {d} packages across {d} connections"
,
.{
names
.
len
,
active_connections
});
var
results
=
try
allocator
.
alloc
(
MetadataResult
,
names
.
len
);
for
(
results
,
0
..)
|*
r
,
i
|
{
r
.
*
=
.{
.
name
=
names
[
i
],
.
data
=
null
,
.
compressed
=
false
,
.
has_error
=
false
};
}
const
total_capacity
=
active_connections
*
(
MAX_PENDING_REQUESTS
-
1
);
var
offset
:
usize
=
0
;
var
batch_num
:
usize
=
0
;
var
decompress_buf
=
std
.
ArrayListUnmanaged
(
u8
){};
defer
decompress_buf
.
deinit
(
c_allocator
);
while
(
offset
<
names
.
len
)
{
const
end
=
@min
(
offset
+
total_capacity
,
names
.
len
);
var
batch_start
:
u64
=
@intCast
(
std
.
time
.
nanoTimestamp
());
debug
.
log
(
" meta: batch {d} ({d}-{d})"
,
.{
batch_num
,
offset
,
end
});
var
queued
:
usize
=
0
;
var
conn_idx
:
usize
=
0
;
for
(
offset
..
end
)
|
i
|
{
const
result
=
&
results
[
i
];
const
name
=
names
[
i
];
const
c
=
self
.
nextMetaClient
(
&
conn_idx
)
orelse
{
result
.
has_error
=
true
;
continue
;
};
const
session
=
c
.
h2_session
orelse
{
result
.
has_error
=
true
;
continue
;
};
var
path_buf
:
[
512
]
u8
=
undefined
;
const
path
=
std
.
fmt
.
bufPrint
(
&
path_buf
,
"/{s}"
,
.{
name
})
catch
{
result
.
has_error
=
true
;
continue
;
};
var
hdrs
=
[
_
]
nghttp2
.
nv
{
Http2Client
.
makeNv
(
":method"
,
"GET"
),
Http2Client
.
makeNv
(
":path"
,
c
.
allocator
.
dupeZ
(
u8
,
path
)
catch
{
result
.
has_error
=
true
;
continue
;
}),
Http2Client
.
makeNv
(
":scheme"
,
"https"
),
Http2Client
.
makeNv
(
":authority"
,
c
.
host
),
Http2Client
.
makeNv
(
"accept"
,
"application/vnd.npm.install-v1+json"
),
Http2Client
.
makeNv
(
"accept-encoding"
,
"gzip"
),
Http2Client
.
makeNv
(
"user-agent"
,
user_agent
),
};
const
req
=
&
c
.
requests
[
c
.
request_count
];
c
.
request_count
+=
1
;
req
.
*
=
.{
.
stream_id
=
0
,
.
path
=
hdrs
[
1
].
value
[
0
..
hdrs
[
1
].
valuelen
:
0
],
.
on_data
=
null
,
.
on_complete
=
null
,
.
on_error
=
null
,
.
userdata
=
result
,
.
response_body
=
.{},
.
status_code
=
0
,
.
done
=
false
,
.
has_error
=
false
,
.
start_ns
=
@intCast
(
std
.
time
.
nanoTimestamp
()),
.
end_ns
=
0
,
.
bytes
=
0
,
.
content_encoding
=
.
identity
,
};
const
sid
=
nghttp2
.
nghttp2_submit_request
(
session
,
null
,
&
hdrs
,
hdrs
.
len
,
null
,
req
);
if
(
sid
<
0
)
{
c
.
request_count
-=
1
;
if
(
req
.
path
)
|
p
|
c
.
allocator
.
free
(
p
);
result
.
has_error
=
true
;
continue
;
}
req
.
stream_id
=
sid
;
queued
+=
1
;
conn_idx
=
(
conn_idx
+
1
)
%
NUM_META_CONNECTIONS
;
}
batch_start
=
debug
.
timer
(
" meta: queue requests"
,
batch_start
);
self
.
flushMetaClients
();
const
loop
=
uv
.
uv_default_loop
();
var
all_done
=
false
;
var
loops
:
usize
=
0
;
const
run_start
:
u64
=
@intCast
(
std
.
time
.
nanoTimestamp
());
while
(
!
all_done
)
{
_
=
uv
.
uv_run
(
loop
,
uv
.
RUN_ONCE
);
loops
+=
1
;
all_done
=
self
.
metaRequestsComplete
();
}
const
elapsed_ns
:
u64
=
@intCast
(
@as
(
i128
,
@intCast
(
std
.
time
.
nanoTimestamp
()))
-
@as
(
i128
,
run_start
));
debug
.
log
(
" h2: run complete in {d}ms, {d} loops"
,
.{
elapsed_ns
/
1
_000_000
,
loops
});
batch_start
=
debug
.
timer
(
" meta: run h2 loop"
,
batch_start
);
var
slow_count
:
usize
=
0
;
var
max_req_ms
:
u64
=
0
;
var
max_req_name
:
[]
const
u8
=
""
;
var
total_bytes
:
usize
=
0
;
for
(
self
.
meta_clients
)
|
maybe_client
|
{
if
(
maybe_client
)
|
c
|
{
for
(
c
.
requests
[
0
..
c
.
request_count
])
|*
req
|
{
const
result
:
*
MetadataResult
=
@ptrCast
(
@alignCast
(
req
.
userdata
));
const
end_ns
=
if
(
req
.
end_ns
!=
0
)
req
.
end_ns
else
@as
(
u64
,
@intCast
(
std
.
time
.
nanoTimestamp
()));
const
duration_ms
:
u64
=
@intCast
((
end_ns
-
req
.
start_ns
)
/
1
_000_000
);
total_bytes
+=
req
.
response_body
.
items
.
len
;
if
(
duration_ms
>
max_req_ms
)
{
max_req_ms
=
duration_ms
;
max_req_name
=
result
.
name
;
}
if
(
duration_ms
>=
META_SLOW_LOG_MS
)
{
slow_count
+=
1
;
debug
.
log
(
" meta: slow {s} {d}ms {d} bytes status={d}"
,
.{
result
.
name
,
duration_ms
,
req
.
response_body
.
items
.
len
,
req
.
status_code
,
});
}
}
}
}
debug
.
log
(
" meta: summary slow={d} max={s} {d}ms total_bytes={d}"
,
.{
slow_count
,
max_req_name
,
max_req_ms
,
total_bytes
});
var
success
:
usize
=
0
;
for
(
self
.
meta_clients
)
|
maybe_client
|
{
if
(
maybe_client
)
|
c
|
{
for
(
c
.
requests
[
0
..
c
.
request_count
])
|*
req
|
{
const
result
:
*
MetadataResult
=
@ptrCast
(
@alignCast
(
req
.
userdata
));
if
(
storeMetadataBatchResult
(
req
,
result
,
allocator
,
&
decompress_buf
))
success
+=
1
;
}
c
.
resetRequests
();
}
}
_
=
debug
.
timer
(
" meta: copy results"
,
batch_start
);
debug
.
log
(
" meta: queued={d} success={d}"
,
.{
queued
,
success
});
offset
=
end
;
batch_num
+=
1
;
}
return
results
;
}
pub
const
MetadataCallback
=
*
const
fn
(
name
:
[]
const
u8
,
data
:
?
[]
const
u8
,
has_error
:
bool
,
userdata
:
?*
anyopaque
)
void
;
const
MetadataStreamTracker
=
struct
{
name
:
[]
const
u8
,
index
:
usize
,
};
fn
emitMetadataStreamingResult
(
req
:
*
Http2Client
.
RequestState
,
name
:
[]
const
u8
,
allocator
:
std
.
mem
.
Allocator
,
decompress_buf
:
*
std
.
ArrayListUnmanaged
(
u8
),
callback
:
MetadataCallback
,
userdata
:
?*
anyopaque
,
)
void
{
if
(
req
.
has_error
or
req
.
status_code
!=
200
)
{
callback
(
name
,
null
,
true
,
userdata
);
return
;
}
if
(
req
.
content_encoding
!=
.
gzip
)
{
callback
(
name
,
req
.
response_body
.
items
,
false
,
userdata
);
return
;
}
decompress_buf
.
clearRetainingCapacity
();
const
decomp
=
extractor
.
GzipDecompressor
.
init
(
allocator
)
catch
{
callback
(
name
,
null
,
true
,
userdata
);
return
;
};
defer
decomp
.
deinit
();
_
=
decomp
.
decompress
(
req
.
response_body
.
items
,
struct
{
fn
onChunk
(
data
:
[]
const
u8
,
ctx
:
?*
anyopaque
)
anyerror
!
void
{
const
buf
:
*
std
.
ArrayListUnmanaged
(
u8
)
=
@ptrCast
(
@alignCast
(
ctx
));
try
buf
.
appendSlice
(
c_allocator
,
data
);
}
}.
onChunk
,
decompress_buf
)
catch
{
callback
(
name
,
null
,
true
,
userdata
);
return
;
};
callback
(
name
,
decompress_buf
.
items
,
false
,
userdata
);
}
fn
emitCompletedStreamingMetadataCallbacks
(
self
:
*
Fetcher
,
processed
:
[]
bool
,
allocator
:
std
.
mem
.
Allocator
,
decompress_buf
:
*
std
.
ArrayListUnmanaged
(
u8
),
callback
:
MetadataCallback
,
userdata
:
?*
anyopaque
,
)
void
{
for
(
self
.
meta_clients
)
|
maybe_client
|
{
const
c
=
maybe_client
orelse
continue
;
for
(
c
.
requests
[
0
..
c
.
request_count
])
|*
req
|
{
if
(
!
req
.
done
and
!
req
.
has_error
)
continue
;
const
tracker
:
*
MetadataStreamTracker
=
@ptrCast
(
@alignCast
(
req
.
userdata
));
if
(
processed
[
tracker
.
index
])
continue
;
processed
[
tracker
.
index
]
=
true
;
emitMetadataStreamingResult
(
req
,
tracker
.
name
,
allocator
,
decompress_buf
,
callback
,
userdata
);
}
}
}
pub
fn
fetchMetadataStreaming
(
self
:
*
Fetcher
,
names
:
[]
const
[]
const
u8
,
allocator
:
std
.
mem
.
Allocator
,
callback
:
MetadataCallback
,
userdata
:
?*
anyopaque
,
)
!
void
{
if
(
names
.
len
==
0
)
return
;
var
total_start
:
u64
=
@intCast
(
std
.
time
.
nanoTimestamp
());
try
self
.
ensureMetaClients
();
total_start
=
debug
.
timer
(
" meta: get clients"
,
total_start
);
var
active_connections
:
usize
=
0
;
for
(
self
.
meta_clients
)
|
maybe_client
|
{
if
(
maybe_client
!=
null
)
active_connections
+=
1
;
}
if
(
active_connections
==
0
)
return
error
.
ConnectionFailed
;
debug
.
log
(
" meta: streaming {d} packages across {d} connections"
,
.{
names
.
len
,
active_connections
});
const
processed
=
try
allocator
.
alloc
(
bool
,
names
.
len
);
defer
allocator
.
free
(
processed
);
@memset
(
processed
,
false
);
var
trackers
=
try
allocator
.
alloc
(
MetadataStreamTracker
,
names
.
len
);
defer
allocator
.
free
(
trackers
);
for
(
names
,
0
..)
|
name
,
i
|
{
trackers
[
i
]
=
.{
.
name
=
name
,
.
index
=
i
};
}
const
total_capacity
=
active_connections
*
(
MAX_PENDING_REQUESTS
-
1
);
var
offset
:
usize
=
0
;
var
batch_num
:
usize
=
0
;
var
decompress_buf
=
std
.
ArrayListUnmanaged
(
u8
){};
defer
decompress_buf
.
deinit
(
c_allocator
);
while
(
offset
<
names
.
len
)
{
const
end
=
@min
(
offset
+
total_capacity
,
names
.
len
);
var
batch_start
:
u64
=
@intCast
(
std
.
time
.
nanoTimestamp
());
debug
.
log
(
" meta: batch {d} ({d}-{d})"
,
.{
batch_num
,
offset
,
end
});
var
queued
:
usize
=
0
;
var
conn_idx
:
usize
=
0
;
for
(
offset
..
end
)
|
i
|
{
const
tracker
=
&
trackers
[
i
];
const
name
=
names
[
i
];
const
c
=
self
.
nextMetaClient
(
&
conn_idx
)
orelse
continue
;
const
session
=
c
.
h2_session
orelse
continue
;
var
path_buf
:
[
512
]
u8
=
undefined
;
const
path
=
std
.
fmt
.
bufPrint
(
&
path_buf
,
"/{s}"
,
.{
name
})
catch
continue
;
var
hdrs
=
[
_
]
nghttp2
.
nv
{
Http2Client
.
makeNv
(
":method"
,
"GET"
),
Http2Client
.
makeNv
(
":path"
,
c
.
allocator
.
dupeZ
(
u8
,
path
)
catch
continue
),
Http2Client
.
makeNv
(
":scheme"
,
"https"
),
Http2Client
.
makeNv
(
":authority"
,
c
.
host
),
Http2Client
.
makeNv
(
"accept"
,
"application/vnd.npm.install-v1+json"
),
Http2Client
.
makeNv
(
"accept-encoding"
,
"gzip"
),
Http2Client
.
makeNv
(
"user-agent"
,
user_agent
),
};
const
req
=
&
c
.
requests
[
c
.
request_count
];
c
.
request_count
+=
1
;
req
.
*
=
.{
.
stream_id
=
0
,
.
path
=
hdrs
[
1
].
value
[
0
..
hdrs
[
1
].
valuelen
:
0
],
.
on_data
=
null
,
.
on_complete
=
null
,
.
on_error
=
null
,
.
userdata
=
tracker
,
.
response_body
=
.{},
.
status_code
=
0
,
.
done
=
false
,
.
has_error
=
false
,
.
start_ns
=
0
,
.
end_ns
=
0
,
.
bytes
=
0
,
.
content_encoding
=
.
identity
,
};
req
.
start_ns
=
@intCast
(
std
.
time
.
nanoTimestamp
());
const
sid
=
nghttp2
.
nghttp2_submit_request
(
session
,
null
,
&
hdrs
,
hdrs
.
len
,
null
,
req
);
if
(
sid
<
0
)
{
c
.
request_count
-=
1
;
if
(
req
.
path
)
|
p
|
c
.
allocator
.
free
(
p
);
continue
;
}
req
.
stream_id
=
sid
;
queued
+=
1
;
conn_idx
=
(
conn_idx
+
1
)
%
NUM_META_CONNECTIONS
;
}
batch_start
=
debug
.
timer
(
" meta: queue requests"
,
batch_start
);
self
.
flushMetaClients
();
const
loop
=
uv
.
uv_default_loop
();
var
all_done
=
false
;
var
loops
:
usize
=
0
;
const
run_start
:
u64
=
@intCast
(
std
.
time
.
nanoTimestamp
());
while
(
!
all_done
)
{
_
=
uv
.
uv_run
(
loop
,
uv
.
RUN_ONCE
);
loops
+=
1
;
self
.
emitCompletedStreamingMetadataCallbacks
(
processed
,
allocator
,
&
decompress_buf
,
callback
,
userdata
);
all_done
=
self
.
metaRequestsComplete
();
}
const
elapsed_ns
:
u64
=
@intCast
(
@as
(
i128
,
@intCast
(
std
.
time
.
nanoTimestamp
()))
-
@as
(
i128
,
run_start
));
debug
.
log
(
" h2: run complete in {d}ms, {d} loops"
,
.{
elapsed_ns
/
1
_000_000
,
loops
});
var
slow_count
:
usize
=
0
;
var
max_req_ms
:
u64
=
0
;
var
max_req_name
:
[]
const
u8
=
""
;
var
total_bytes
:
usize
=
0
;
for
(
self
.
meta_clients
)
|
maybe_client
|
{
if
(
maybe_client
)
|
c
|
{
for
(
c
.
requests
[
0
..
c
.
request_count
])
|*
req
|
{
const
tracker
:
*
MetadataStreamTracker
=
@ptrCast
(
@alignCast
(
req
.
userdata
));
const
end_ns
=
if
(
req
.
end_ns
!=
0
)
req
.
end_ns
else
@as
(
u64
,
@intCast
(
std
.
time
.
nanoTimestamp
()));
const
duration_ms
:
u64
=
@intCast
((
end_ns
-
req
.
start_ns
)
/
1
_000_000
);
total_bytes
+=
req
.
response_body
.
items
.
len
;
if
(
duration_ms
>
max_req_ms
)
{
max_req_ms
=
duration_ms
;
max_req_name
=
tracker
.
name
;
}
if
(
duration_ms
>=
META_SLOW_LOG_MS
)
{
slow_count
+=
1
;
debug
.
log
(
" meta: slow {s} {d}ms {d} bytes status={d}"
,
.{
tracker
.
name
,
duration_ms
,
req
.
response_body
.
items
.
len
,
req
.
status_code
,
});
}
}
}
}
debug
.
log
(
" meta: summary slow={d} max={s} {d}ms total_bytes={d}"
,
.{
slow_count
,
max_req_name
,
max_req_ms
,
total_bytes
});
for
(
self
.
meta_clients
)
|
maybe_client
|
{
if
(
maybe_client
)
|
c
|
c
.
resetRequests
();
}
offset
=
end
;
batch_num
+=
1
;
}
}
pub
fn
fetchTarball
(
self
:
*
Fetcher
,
url
:
[]
const
u8
,
handler
:
StreamHandler
)
!
void
{
try
self
.
pending
.
append
(
self
.
allocator
,
.{
.
url
=
try
self
.
allocator
.
dupe
(
u8
,
url
),
.
handler
=
handler
});
}
pub
fn
run
(
self
:
*
Fetcher
)
!
void
{
if
(
self
.
pending
.
items
.
len
==
0
and
self
.
tarball_contexts
.
items
.
len
==
0
)
return
;
const
run_start
:
u64
=
@intCast
(
std
.
time
.
nanoTimestamp
());
const
total_requests
=
self
.
pending
.
items
.
len
+
self
.
tarball_contexts
.
items
.
len
;
debug
.
log
(
"fetcher: {d} tarballs to download (pending={d}, in-flight={d})"
,
.{
total_requests
,
self
.
pending
.
items
.
len
,
self
.
tarball_contexts
.
items
.
len
,
});
try
self
.
ensureTarballClients
();
self
.
finishTarballs
();
const
elapsed_ns
:
u64
=
@intCast
(
@as
(
i128
,
@intCast
(
std
.
time
.
nanoTimestamp
()))
-
@as
(
i128
,
run_start
));
debug
.
log
(
"fetcher: {d} tarballs complete in {d}ms"
,
.{
total_requests
,
elapsed_ns
/
1
_000_000
});
}
};
File Metadata
Details
Attached
Mime Type
text/x-asm
Expires
Thu, Mar 26, 4:46 PM (1 d, 18 h)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
511732
Default Alt Text
fetcher.zig (55 KB)
Attached To
Mode
rANT Ant
Attached
Detach File
Event Timeline
Log In to Comment