Page MenuHomePhorge

pipes.c
No OneTemporary

Size
27 KB
Referenced Files
None
Subscribers
None
#include <string.h>
#include "ant.h"
#include "errors.h"
#include "internal.h"
#include "descriptors.h"
#include "silver/engine.h"
#include "modules/assert.h"
#include "modules/abort.h"
#include "streams/pipes.h"
#include "streams/readable.h"
#include "streams/writable.h"
#include "modules/structured-clone.h"
static void pipes_chain_promise(
ant_t *js, ant_value_t value,
ant_value_t on_resolve, ant_value_t on_reject
) {
ant_value_t promise = value;
if (vtype(promise) != T_PROMISE) {
promise = js_mkpromise(js);
js_resolve_promise(js, promise, value);
}
js_promise_then(js, promise, on_resolve, on_reject);
}
typedef struct {
bool settled;
bool shutting_down;
bool in_flight;
bool prevent_close;
bool prevent_abort;
bool prevent_cancel;
} pipe_state_t;
static void pipe_state_finalize(ant_t *js, ant_object_t *obj) {
if (!obj->extra_slots) return;
ant_extra_slot_t *entries = (ant_extra_slot_t *)obj->extra_slots;
for (uint8_t i = 0; i < obj->extra_count; i++) {
if (entries[i].slot == SLOT_DATA && vtype(entries[i].value) == T_NUM) {
free((pipe_state_t *)(uintptr_t)(size_t)js_getnum(entries[i].value));
return;
}}
}
static pipe_state_t *pipe_get_state(ant_value_t state) {
ant_value_t s = js_get_slot(state, SLOT_DATA);
if (vtype(s) != T_NUM) return NULL;
return (pipe_state_t *)(uintptr_t)(size_t)js_getnum(s);
}
static ant_value_t pipe_state_source(ant_value_t state) {
return js_get_slot(state, SLOT_ENTRIES);
}
static ant_value_t pipe_state_dest(ant_value_t state) {
return js_get_slot(state, SLOT_CTOR);
}
static ant_value_t pipe_state_reader(ant_value_t state) {
return js_get_slot(state, SLOT_BUFFER);
}
static ant_value_t pipe_state_writer(ant_value_t state) {
return js_get_slot(state, SLOT_DEFAULT);
}
static ant_value_t pipe_state_promise(ant_value_t state) {
return js_get_slot(state, SLOT_RS_PULL);
}
static void pipes_release_reader(ant_t *js, ant_value_t reader_obj) {
ant_value_t stream_obj = rs_reader_stream(reader_obj);
if (!rs_is_stream(stream_obj)) return;
if (rs_reader_has_reqs(js, reader_obj)) {
ant_value_t release_err = js_make_error_silent(js, JS_ERR_TYPE, "Reader was released");
rs_default_reader_error_read_requests(js, reader_obj, release_err);
}
ant_value_t old_closed = rs_reader_closed(reader_obj);
ant_value_t new_closed = js_mkpromise(js);
ant_value_t release_err = js_make_error_silent(js, JS_ERR_TYPE, "Reader was released");
rs_stream_t *rs = rs_get_stream(stream_obj);
if (rs && rs->state == RS_STATE_READABLE) {
js_reject_promise(js, old_closed, release_err);
promise_mark_handled(old_closed);
}
js_reject_promise(js, new_closed, release_err);
promise_mark_handled(new_closed);
js_set_slot(reader_obj, SLOT_RS_CLOSED, new_closed);
js_set_slot(stream_obj, SLOT_CTOR, js_mkundef());
js_set_slot(reader_obj, SLOT_ENTRIES, js_mkundef());
}
static void pipes_release_writer(ant_t *js, ant_value_t writer_obj) {
ant_value_t ws_obj = js_get_slot(writer_obj, SLOT_ENTRIES);
if (!ws_is_stream(ws_obj)) return;
ant_value_t rel_err = js_make_error_silent(js, JS_ERR_TYPE, "Writer was released");
ant_value_t ready = js_mkpromise(js);
js_reject_promise(js, ready, rel_err);
promise_mark_handled(ready);
js_set_slot(writer_obj, SLOT_WS_READY, ready);
ant_value_t closed = js_mkpromise(js);
js_reject_promise(js, closed, rel_err);
promise_mark_handled(closed);
js_set_slot(writer_obj, SLOT_RS_CLOSED, closed);
js_set_slot(ws_obj, SLOT_CTOR, js_mkundef());
js_set_slot(writer_obj, SLOT_ENTRIES, js_mkundef());
}
static void pipes_release_locks(ant_t *js, ant_value_t state) {
ant_value_t reader = pipe_state_reader(state);
if (rs_is_reader(reader)) {
pipes_release_reader(js, reader);
js_set_slot(state, SLOT_BUFFER, js_mkundef());
}
ant_value_t writer = pipe_state_writer(state);
if (ws_is_writer(writer)) {
pipes_release_writer(js, writer);
js_set_slot(state, SLOT_DEFAULT, js_mkundef());
}
}
static void pipes_settle(ant_t *js, ant_value_t state, bool ok, ant_value_t value) {
pipe_state_t *pst = pipe_get_state(state);
if (!pst || pst->settled) return;
pst->settled = true;
pst->shutting_down = true;
pipes_release_locks(js, state);
ant_value_t promise = pipe_state_promise(state);
if (ok) js_resolve_promise(js, promise, value);
else js_reject_promise(js, promise, value);
}
static void pipes_shutdown_from_source_error(ant_t *js, ant_value_t state, ant_value_t error) {
pipe_state_t *pst = pipe_get_state(state);
if (!pst || pst->settled || pst->shutting_down) return;
pst->shutting_down = true;
pst->in_flight = false;
if (!pst->prevent_abort) {
ant_value_t result = writable_stream_abort(js, pipe_state_dest(state), error);
promise_mark_handled(result);
}
pipes_settle(js, state, false, error);
}
static void pipes_shutdown_from_dest_error(ant_t *js, ant_value_t state, ant_value_t error) {
pipe_state_t *pst = pipe_get_state(state);
if (!pst || pst->settled || pst->shutting_down) return;
pst->shutting_down = true;
pst->in_flight = false;
if (!pst->prevent_cancel) {
ant_value_t result = readable_stream_cancel(js, pipe_state_source(state), error);
promise_mark_handled(result);
}
pipes_settle(js, state, false, error);
}
static void pipes_shutdown_from_abort(ant_t *js, ant_value_t state, ant_value_t reason) {
pipe_state_t *pst = pipe_get_state(state);
if (!pst || pst->settled || pst->shutting_down) return;
pst->shutting_down = true;
pst->in_flight = false;
if (!pst->prevent_abort) {
ant_value_t result = writable_stream_abort(js, pipe_state_dest(state), reason);
promise_mark_handled(result);
}
if (!pst->prevent_cancel) {
ant_value_t result = readable_stream_cancel(js, pipe_state_source(state), reason);
promise_mark_handled(result);
}
pipes_settle(js, state, false, reason);
}
static void pipes_pump(ant_t *js, ant_value_t state);
static ant_value_t pipe_write_resolve(ant_t *js, ant_value_t *args, int nargs) {
ant_value_t state = js_get_slot(js->current_func, SLOT_DATA);
pipe_state_t *pst = pipe_get_state(state);
if (!pst) return js_mkundef();
pst->in_flight = false;
if (pst->settled || pst->shutting_down)
return js_mkundef();
pipes_pump(js, state);
return js_mkundef();
}
static ant_value_t pipe_dest_error(ant_t *js, ant_value_t *args, int nargs) {
ant_value_t state = js_get_slot(js->current_func, SLOT_DATA);
ant_value_t error = (nargs > 0) ? args[0] : js_mkundef();
pipes_shutdown_from_dest_error(js, state, error);
return js_mkundef();
}
static ant_value_t pipe_close_dest_resolve(ant_t *js, ant_value_t *args, int nargs) {
ant_value_t state = js_get_slot(js->current_func, SLOT_DATA);
pipes_settle(js, state, true, js_mkundef());
return js_mkundef();
}
static ant_value_t pipe_close_dest_reject(ant_t *js, ant_value_t *args, int nargs) {
ant_value_t state = js_get_slot(js->current_func, SLOT_DATA);
ant_value_t error = (nargs > 0) ? args[0] : js_mkundef();
pipes_settle(js, state, false, error);
return js_mkundef();
}
static ant_value_t pipe_ignore(ant_t *js, ant_value_t *args, int nargs) {
return js_mkundef();
}
static ant_value_t pipe_read_resolve(ant_t *js, ant_value_t *args, int nargs) {
ant_value_t state = js_get_slot(js->current_func, SLOT_DATA);
pipe_state_t *pst = pipe_get_state(state);
if (!pst || pst->settled || pst->shutting_down) {
if (pst) pst->in_flight = false;
return js_mkundef();
}
ant_value_t result = (nargs > 0) ? args[0] : js_mkundef();
bool done = js_truthy(js, js_get(js, result, "done"));
if (done) {
pst->in_flight = false;
if (pst->prevent_close) {
pipes_settle(js, state, true, js_mkundef());
return js_mkundef();
}
ant_value_t close_promise = writable_stream_close(js, pipe_state_dest(state));
ant_value_t on_resolve = js_heavy_mkfun(js, pipe_close_dest_resolve, state);
ant_value_t on_reject = js_heavy_mkfun(js, pipe_close_dest_reject, state);
pipes_chain_promise(js, close_promise, on_resolve, on_reject);
return js_mkundef();
}
ant_value_t value = js_get(js, result, "value");
ant_value_t write_promise = ws_writer_write(js, pipe_state_writer(state), value);
ant_value_t on_resolve = js_heavy_mkfun(js, pipe_write_resolve, state);
ant_value_t on_reject = js_heavy_mkfun(js, pipe_dest_error, state);
pipes_chain_promise(js, write_promise, on_resolve, on_reject);
return js_mkundef();
}
static ant_value_t pipe_source_error(ant_t *js, ant_value_t *args, int nargs) {
ant_value_t state = js_get_slot(js->current_func, SLOT_DATA);
ant_value_t error = (nargs > 0) ? args[0] : js_mkundef();
pipes_shutdown_from_source_error(js, state, error);
return js_mkundef();
}
static ant_value_t pipe_ready_resolve(ant_t *js, ant_value_t *args, int nargs) {
ant_value_t state = js_get_slot(js->current_func, SLOT_DATA);
pipe_state_t *pst = pipe_get_state(state);
if (!pst || pst->settled || pst->shutting_down) {
if (pst) pst->in_flight = false;
return js_mkundef();
}
ant_value_t read_promise = rs_default_reader_read(js, pipe_state_reader(state));
ant_value_t on_resolve = js_heavy_mkfun(js, pipe_read_resolve, state);
ant_value_t on_reject = js_heavy_mkfun(js, pipe_source_error, state);
pipes_chain_promise(js, read_promise, on_resolve, on_reject);
return js_mkundef();
}
static ant_value_t pipe_abort_listener(ant_t *js, ant_value_t *args, int nargs) {
ant_value_t state = js_get_slot(js->current_func, SLOT_DATA);
pipe_state_t *pst = pipe_get_state(state);
if (!pst || pst->settled || pst->shutting_down)
return js_mkundef();
ant_value_t signal = js_get_slot(state, SLOT_RS_CANCEL);
pipes_shutdown_from_abort(js, state, abort_signal_get_reason(signal));
return js_mkundef();
}
static void pipes_pump(ant_t *js, ant_value_t state) {
pipe_state_t *pst = pipe_get_state(state);
if (!pst || pst->settled || pst->shutting_down || pst->in_flight) return;
pst->in_flight = true;
ant_value_t writer = pipe_state_writer(state);
ant_value_t ready = ws_writer_ready(writer);
ant_value_t on_resolve = js_heavy_mkfun(js, pipe_ready_resolve, state);
ant_value_t on_reject = js_heavy_mkfun(js, pipe_dest_error, state);
pipes_chain_promise(js, ready, on_resolve, on_reject);
}
static ant_value_t pipe_create_rejected(ant_t *js, ant_value_t error) {
ant_value_t promise = js_mkpromise(js);
js_reject_promise(js, promise, error);
return promise;
}
static void pipes_parse_options(
ant_t *js, ant_value_t options,
bool *prevent_close, bool *prevent_abort, bool *prevent_cancel,
ant_value_t *signal
) {
*prevent_close = false;
*prevent_abort = false;
*prevent_cancel = false;
*signal = js_mkundef();
if (!is_object_type(options)) return;
*prevent_close = js_truthy(js, js_get(js, options, "preventClose"));
*prevent_abort = js_truthy(js, js_get(js, options, "preventAbort"));
*prevent_cancel = js_truthy(js, js_get(js, options, "preventCancel"));
*signal = js_get(js, options, "signal");
}
ant_value_t readable_stream_pipe_to(
ant_t *js, ant_value_t source, ant_value_t dest,
bool prevent_close, bool prevent_abort, bool prevent_cancel,
ant_value_t signal
) {
rs_stream_t *rs = rs_get_stream(source);
ws_stream_t *ws = ws_get_stream(dest);
if (!rs || !ws) {
js_mkerr_typed(js, JS_ERR_TYPE, "pipeTo requires a ReadableStream and WritableStream");
return pipe_create_rejected(js, js->thrown_value);
}
if (rs_is_reader(rs_stream_reader(source))) {
js_mkerr_typed(js, JS_ERR_TYPE, "ReadableStream is already locked");
return pipe_create_rejected(js, js->thrown_value);
}
if (ws_is_writer(ws_stream_writer(dest))) {
js_mkerr_typed(js, JS_ERR_TYPE, "WritableStream is already locked");
return pipe_create_rejected(js, js->thrown_value);
}
if (!is_undefined(signal) && !abort_signal_is_signal(signal)) {
js_mkerr_typed(js, JS_ERR_TYPE, "pipeTo option 'signal' must be an AbortSignal");
return pipe_create_rejected(js, js->thrown_value);
}
ant_value_t reader_args[1] = { source };
ant_value_t saved = js->new_target;
js->new_target = g_reader_proto;
ant_value_t reader = js_rs_reader_ctor(js, reader_args, 1);
js->new_target = saved;
if (is_err(reader)) return pipe_create_rejected(js, js->thrown_value);
rs->disturbed = true;
ant_value_t writer = ws_acquire_writer(js, dest);
if (is_err(writer)) {
pipes_release_reader(js, reader);
return pipe_create_rejected(js, js->thrown_value);
}
pipe_state_t *pst = calloc(1, sizeof(pipe_state_t));
if (!pst) return js_mkerr(js, "out of memory");
pst->prevent_close = prevent_close;
pst->prevent_abort = prevent_abort;
pst->prevent_cancel = prevent_cancel;
ant_value_t promise = js_mkpromise(js);
ant_value_t state = js_mkobj(js);
js_set_slot(state, SLOT_DATA, ANT_PTR(pst));
js_set_slot(state, SLOT_ENTRIES, source);
js_set_slot(state, SLOT_CTOR, dest);
js_set_slot(state, SLOT_BUFFER, reader);
js_set_slot(state, SLOT_DEFAULT, writer);
js_set_slot(state, SLOT_RS_PULL, promise);
js_set_finalizer(state, pipe_state_finalize);
js_set_slot(state, SLOT_RS_CANCEL, signal);
promise_mark_handled(rs_reader_closed(reader));
promise_mark_handled(js_get_slot(writer, SLOT_RS_CLOSED));
promise_mark_handled(js_get_slot(writer, SLOT_WS_READY));
ant_value_t ignore_fn = js_heavy_mkfun(js, pipe_ignore, state);
ant_value_t source_closed_reject = js_heavy_mkfun(js, pipe_source_error, state);
ant_value_t dest_closed_reject = js_heavy_mkfun(js, pipe_dest_error, state);
pipes_chain_promise(js, rs_reader_closed(reader), ignore_fn, source_closed_reject);
pipes_chain_promise(js, js_get_slot(writer, SLOT_RS_CLOSED), ignore_fn, dest_closed_reject);
if (abort_signal_is_signal(signal) && abort_signal_is_aborted(signal)) {
pipes_shutdown_from_abort(js, state, abort_signal_get_reason(signal));
return promise;
}
if (abort_signal_is_signal(signal)) {
ant_value_t listener = js_heavy_mkfun(js, pipe_abort_listener, state);
js_set_slot(state, SLOT_PIPE_ABORT_LISTENER, listener);
abort_signal_add_listener(js, signal, listener);
}
pipes_pump(js, state);
return promise;
}
static ant_value_t js_rs_pipe_to(ant_t *js, ant_value_t *args, int nargs) {
if (!rs_is_stream(js->this_val)) {
js_mkerr_typed(js, JS_ERR_TYPE, "Invalid ReadableStream");
return pipe_create_rejected(js, js->thrown_value);
}
ant_value_t dest = (nargs > 0) ? args[0] : js_mkundef();
bool prevent_close, prevent_abort, prevent_cancel;
ant_value_t signal;
pipes_parse_options(js, nargs > 1 ? args[1] : js_mkundef(),
&prevent_close, &prevent_abort, &prevent_cancel, &signal);
return readable_stream_pipe_to(js, js->this_val, dest,
prevent_close, prevent_abort, prevent_cancel, signal);
}
static ant_value_t js_rs_pipe_through(ant_t *js, ant_value_t *args, int nargs) {
ant_value_t source = js->this_val;
if (!rs_is_stream(source)) return js_mkerr_typed(js, JS_ERR_TYPE, "Invalid ReadableStream");
if (rs_is_reader(rs_stream_reader(source)))
return js_mkerr_typed(js, JS_ERR_TYPE, "ReadableStream is already locked");
if (nargs < 1 || !is_object_type(args[0]))
return js_mkerr_typed(js, JS_ERR_TYPE, "pipeThrough requires a transform object");
ant_value_t transform = args[0];
ant_value_t writable = js_get(js, transform, "writable");
ant_value_t readable = js_get(js, transform, "readable");
if (!ws_is_stream(writable))
return js_mkerr_typed(js, JS_ERR_TYPE, "pipeThrough transform.writable must be a WritableStream");
if (!rs_is_stream(readable))
return js_mkerr_typed(js, JS_ERR_TYPE, "pipeThrough transform.readable must be a ReadableStream");
if (ws_is_writer(ws_stream_writer(writable)))
return js_mkerr_typed(js, JS_ERR_TYPE, "WritableStream is already locked");
bool prevent_close, prevent_abort, prevent_cancel;
ant_value_t signal;
pipes_parse_options(
js, nargs > 1 ? args[1] : js_mkundef(),
&prevent_close, &prevent_abort, &prevent_cancel, &signal
);
ant_value_t pipe_promise = readable_stream_pipe_to(
js, source, writable,
prevent_close, prevent_abort, prevent_cancel, signal
);
promise_mark_handled(pipe_promise);
return readable;
}
typedef struct {
bool pulling;
bool done;
bool canceled1;
bool canceled2;
} tee_state_t;
static void tee_state_finalize(ant_t *js, ant_object_t *obj) {
if (!obj->extra_slots) return;
ant_extra_slot_t *entries = (ant_extra_slot_t *)obj->extra_slots;
for (uint8_t i = 0; i < obj->extra_count; i++) {
if (entries[i].slot == SLOT_DATA && vtype(entries[i].value) == T_NUM) {
free((tee_state_t *)(uintptr_t)(size_t)js_getnum(entries[i].value));
return;
}}
}
static tee_state_t *tee_get_state(ant_value_t state) {
ant_value_t s = js_get_slot(state, SLOT_DATA);
if (vtype(s) != T_NUM) return NULL;
return (tee_state_t *)(uintptr_t)(size_t)js_getnum(s);
}
static ant_value_t tee_state_reader(ant_value_t state) {
return js_get_slot(state, SLOT_BUFFER);
}
static void tee_release_reader(ant_t *js, ant_value_t state) {
ant_value_t reader = tee_state_reader(state);
if (!rs_is_reader(reader)) return;
pipes_release_reader(js, reader);
js_set_slot(state, SLOT_BUFFER, js_mkundef());
}
static void tee_resolve_cancel_promises(ant_t *js, ant_value_t state) {
ant_value_t p1 = js_get_slot(state, SLOT_RS_CLOSED);
ant_value_t p2 = js_get_slot(state, SLOT_RS_SIZE);
if (vtype(p1) == T_PROMISE) {
js_resolve_promise(js, p1, js_mkundef());
js_set_slot(state, SLOT_RS_CLOSED, js_mkundef());
}
if (vtype(p2) == T_PROMISE) {
js_resolve_promise(js, p2, js_mkundef());
js_set_slot(state, SLOT_RS_SIZE, js_mkundef());
}
}
static void tee_reject_cancel_promises(ant_t *js, ant_value_t state, ant_value_t error) {
ant_value_t p1 = js_get_slot(state, SLOT_RS_CLOSED);
ant_value_t p2 = js_get_slot(state, SLOT_RS_SIZE);
if (vtype(p1) == T_PROMISE) {
js_reject_promise(js, p1, error);
js_set_slot(state, SLOT_RS_CLOSED, js_mkundef());
}
if (vtype(p2) == T_PROMISE) {
js_reject_promise(js, p2, error);
js_set_slot(state, SLOT_RS_SIZE, js_mkundef());
}
}
static void tee_finalize(ant_t *js, ant_value_t state) {
tee_state_t *st = tee_get_state(state);
if (!st || st->done) return;
st->done = true;
tee_release_reader(js, state);
}
static void tee_close_branch(ant_t *js, ant_value_t branch_stream) {
ant_value_t ctrl = rs_stream_controller(js, branch_stream);
rs_controller_t *c = rs_get_controller(ctrl);
if (!c || c->close_requested) return;
c->close_requested = true;
if (rs_ctrl_queue_len(js, ctrl) == 0) {
rs_default_controller_clear_algorithms(ctrl);
readable_stream_close(js, branch_stream);
}
}
static void tee_enqueue_branch(ant_t *js, ant_value_t branch_stream, ant_value_t value) {
ant_value_t ctrl = rs_stream_controller(js, branch_stream);
rs_controller_t *c = rs_get_controller(ctrl);
if (!c || !rs_default_controller_can_close_or_enqueue(c, rs_get_stream(branch_stream)))
return;
ant_value_t r = rs_stream_reader(branch_stream);
if (rs_is_reader(r) && rs_reader_has_reqs(js, r)) {
rs_fulfill_read_request(js, branch_stream, value, false);
rs_default_controller_call_pull_if_needed(js, ctrl);
return;
}
double chunk_size = 1;
ant_value_t size_fn = rs_ctrl_size(ctrl);
if (is_callable(size_fn)) {
ant_value_t sa[1] = { value };
ant_value_t sr = sv_vm_call(js->vm, js, size_fn, js_mkundef(), sa, 1, NULL, false);
if (!is_err(sr))
chunk_size = vtype(sr) == T_NUM ? js_getnum(sr) : js_to_number(js, sr);
}
rs_ctrl_queue_push(js, ctrl, value);
if (c->queue_sizes_len >= c->queue_sizes_cap) {
uint32_t nc = c->queue_sizes_cap ? c->queue_sizes_cap * 2 : 8;
double *ns = realloc(c->queue_sizes, nc * sizeof(double));
if (ns) { c->queue_sizes = ns; c->queue_sizes_cap = nc; }
}
if (c->queue_sizes_len < c->queue_sizes_cap)
c->queue_sizes[c->queue_sizes_len++] = chunk_size;
c->queue_total_size += chunk_size;
rs_default_controller_call_pull_if_needed(js, ctrl);
}
static void tee_error_branch(ant_t *js, ant_value_t branch_stream, ant_value_t error) {
readable_stream_error(js, branch_stream, error);
}
static void tee_pull(ant_t *js, ant_value_t state);
static ant_value_t tee_read_reject(ant_t *js, ant_value_t *args, int nargs);
static ant_value_t tee_cancel_both_resolve(ant_t *js, ant_value_t *args, int nargs) {
ant_value_t state = js_get_slot(js->current_func, SLOT_DATA);
tee_resolve_cancel_promises(js, state);
tee_finalize(js, state);
return js_mkundef();
}
static ant_value_t tee_cancel_both_reject(ant_t *js, ant_value_t *args, int nargs) {
ant_value_t state = js_get_slot(js->current_func, SLOT_DATA);
ant_value_t error = (nargs > 0) ? args[0] : js_mkundef();
tee_reject_cancel_promises(js, state, error);
tee_finalize(js, state);
return js_mkundef();
}
static ant_value_t tee_read_resolve(ant_t *js, ant_value_t *args, int nargs) {
ant_value_t state = js_get_slot(js->current_func, SLOT_DATA);
tee_state_t *st = tee_get_state(state);
if (!st) return js_mkundef();
st->pulling = false;
if (st->done) return js_mkundef();
ant_value_t result = (nargs > 0) ? args[0] : js_mkundef();
bool done = js_truthy(js, js_get(js, result, "done"));
ant_value_t branch1 = js_get_slot(state, SLOT_CTOR);
ant_value_t branch2 = js_get_slot(state, SLOT_DEFAULT);
if (done) {
if (!st->canceled1) tee_close_branch(js, branch1);
if (!st->canceled2) tee_close_branch(js, branch2);
tee_resolve_cancel_promises(js, state);
tee_finalize(js, state);
return js_mkundef();
}
ant_value_t value = js_get(js, result, "value");
ant_value_t clone = value;
if (!st->canceled1 && !st->canceled2) {
ant_value_t clone_args[1] = { value };
clone = js_structured_clone(js, clone_args, 1);
if (is_err(clone)) {
tee_read_reject(js, &clone, 1);
return js_mkundef();
}}
if (!st->canceled1) tee_enqueue_branch(js, branch1, value);
if (!st->canceled2) tee_enqueue_branch(js, branch2, clone);
return js_mkundef();
}
static ant_value_t tee_read_reject(ant_t *js, ant_value_t *args, int nargs) {
ant_value_t state = js_get_slot(js->current_func, SLOT_DATA);
tee_state_t *st = tee_get_state(state);
if (!st) return js_mkundef();
ant_value_t error = (nargs > 0) ? args[0] : js_mkundef();
st->pulling = false;
if (st->done) return js_mkundef();
ant_value_t branch1 = js_get_slot(state, SLOT_CTOR);
ant_value_t branch2 = js_get_slot(state, SLOT_DEFAULT);
if (!st->canceled1) tee_error_branch(js, branch1, error);
if (!st->canceled2) tee_error_branch(js, branch2, error);
tee_resolve_cancel_promises(js, state);
tee_finalize(js, state);
return js_mkundef();
}
static void tee_pull(ant_t *js, ant_value_t state) {
tee_state_t *st = tee_get_state(state);
if (!st || st->done || st->pulling) return;
if (st->canceled1 && st->canceled2) return;
st->pulling = true;
ant_value_t read_promise = rs_default_reader_read(js, tee_state_reader(state));
ant_value_t on_resolve = js_heavy_mkfun(js, tee_read_resolve, state);
ant_value_t on_reject = js_heavy_mkfun(js, tee_read_reject, state);
pipes_chain_promise(js, read_promise, on_resolve, on_reject);
}
static ant_value_t tee_branch_pull(ant_t *js, ant_value_t *args, int nargs) {
ant_value_t state = js_get_slot(js->current_func, SLOT_DATA);
tee_pull(js, state);
ant_value_t promise = js_mkpromise(js);
js_resolve_promise(js, promise, js_mkundef());
return promise;
}
static ant_value_t tee_branch_cancel(ant_t *js, ant_value_t *args, int nargs) {
ant_value_t wrapper = js_get_slot(js->current_func, SLOT_DATA);
ant_value_t state = js_get_slot(wrapper, SLOT_DATA);
int branch = (int)js_getnum(js_get_slot(wrapper, SLOT_ENTRIES));
ant_value_t reason = (nargs > 0) ? args[0] : js_mkundef();
tee_state_t *st = tee_get_state(state);
if (!st) return js_mkundef();
bool is_b1 = (branch == 1);
internal_slot_t reason_slot = is_b1 ? SLOT_RS_PULL : SLOT_RS_CANCEL;
internal_slot_t promise_slot = is_b1 ? SLOT_RS_CLOSED : SLOT_RS_SIZE;
bool already_canceled = is_b1 ? st->canceled1 : st->canceled2;
if (already_canceled) {
ant_value_t existing = js_get_slot(state, promise_slot);
if (vtype(existing) == T_PROMISE) return existing;
ant_value_t resolved = js_mkpromise(js);
js_resolve_promise(js, resolved, js_mkundef());
return resolved;
}
if (is_b1) st->canceled1 = true;
else st->canceled2 = true;
js_set_slot(state, reason_slot, reason);
ant_value_t promise = js_mkpromise(js);
js_set_slot(state, promise_slot, promise);
if (st->done) {
js_resolve_promise(js, promise, js_mkundef());
js_set_slot(state, promise_slot, js_mkundef());
return promise;
}
if (st->canceled1 && st->canceled2) {
ant_value_t reasons = js_mkarr(js);
js_arr_push(js, reasons, js_get_slot(state, SLOT_RS_PULL));
js_arr_push(js, reasons, js_get_slot(state, SLOT_RS_CANCEL));
ant_value_t orig_stream = js_get_slot(state, SLOT_ENTRIES);
ant_value_t cancel_promise = readable_stream_cancel(js, orig_stream, reasons);
ant_value_t on_resolve = js_heavy_mkfun(js, tee_cancel_both_resolve, state);
ant_value_t on_reject = js_heavy_mkfun(js, tee_cancel_both_reject, state);
pipes_chain_promise(js, cancel_promise, on_resolve, on_reject);
}
return promise;
}
static ant_value_t js_rs_tee(ant_t *js, ant_value_t *args, int nargs) {
if (!rs_is_stream(js->this_val)) return js_mkerr_typed(js, JS_ERR_TYPE, "Invalid ReadableStream");
if (rs_is_reader(rs_stream_reader(js->this_val)))
return js_mkerr_typed(js, JS_ERR_TYPE, "ReadableStream is already locked");
ant_value_t reader_args[1] = { js->this_val };
ant_value_t saved = js->new_target;
js->new_target = g_reader_proto;
ant_value_t reader = js_rs_reader_ctor(js, reader_args, 1);
js->new_target = saved;
if (is_err(reader)) return reader;
tee_state_t *st = calloc(1, sizeof(tee_state_t));
if (!st) return js_mkerr(js, "out of memory");
ant_value_t state = js_mkobj(js);
js_set_slot(state, SLOT_DATA, ANT_PTR(st));
js_set_slot(state, SLOT_ENTRIES, js->this_val);
js_set_slot(state, SLOT_BUFFER, reader);
js_set_slot(state, SLOT_RS_PULL, js_mkundef());
js_set_slot(state, SLOT_RS_CANCEL, js_mkundef());
js_set_slot(state, SLOT_RS_CLOSED, js_mkundef());
js_set_slot(state, SLOT_RS_SIZE, js_mkundef());
js_set_finalizer(state, tee_state_finalize);
ant_value_t pull1 = js_heavy_mkfun(js, tee_branch_pull, state);
ant_value_t pull2 = js_heavy_mkfun(js, tee_branch_pull, state);
ant_value_t cancel1_wrap = js_mkobj(js);
js_set_slot(cancel1_wrap, SLOT_DATA, state);
js_set_slot(cancel1_wrap, SLOT_ENTRIES, js_mknum(1));
ant_value_t cancel1 = js_heavy_mkfun(js, tee_branch_cancel, cancel1_wrap);
ant_value_t cancel2_wrap = js_mkobj(js);
js_set_slot(cancel2_wrap, SLOT_DATA, state);
js_set_slot(cancel2_wrap, SLOT_ENTRIES, js_mknum(2));
ant_value_t cancel2 = js_heavy_mkfun(js, tee_branch_cancel, cancel2_wrap);
ant_value_t branch1 = rs_create_stream(js, pull1, cancel1, 1);
ant_value_t branch2 = rs_create_stream(js, pull2, cancel2, 1);
if (is_err(branch1) || is_err(branch2)) {
tee_release_reader(js, state);
return is_err(branch1) ? branch1 : branch2;
}
js_set_slot(state, SLOT_CTOR, branch1);
js_set_slot(state, SLOT_DEFAULT, branch2);
ant_value_t result = js_mkarr(js);
js_arr_push(js, result, branch1);
js_arr_push(js, result, branch2);
return result;
}
ant_value_t readable_stream_tee(ant_t *js, ant_value_t source) {
ant_value_t saved_this = js->this_val;
js->this_val = source;
ant_value_t result = js_rs_tee(js, NULL, 0);
js->this_val = saved_this;
return result;
}
void init_pipes_proto(ant_t *js, ant_value_t rs_proto) {
js_set(js, rs_proto, "pipeTo", js_mkfun(js_rs_pipe_to));
js_set_descriptor(js, rs_proto, "pipeTo", 6, JS_DESC_W | JS_DESC_C);
js_set(js, rs_proto, "pipeThrough", js_mkfun(js_rs_pipe_through));
js_set_descriptor(js, rs_proto, "pipeThrough", 11, JS_DESC_W | JS_DESC_C);
js_set(js, rs_proto, "tee", js_mkfun(js_rs_tee));
js_set_descriptor(js, rs_proto, "tee", 3, JS_DESC_W | JS_DESC_C);
}

File Metadata

Mime Type
text/x-c
Expires
Sat, Apr 4, 2:06 AM (1 d, 22 h)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
520842
Default Alt Text
pipes.c (27 KB)

Event Timeline