Page Menu
Home
Phorge
Search
Configure Global Search
Log In
Files
F4397764
stream.c
No One
Temporary
Actions
Download File
Edit File
Delete File
View Transforms
Subscribe
Flag For Later
Award Token
Size
72 KB
Referenced Files
None
Subscribers
None
stream.c
View Options
#include
<stdlib.h>
#include
<string.h>
#include
"ant.h"
#include
"ptr.h"
#include
"internal.h"
#include
"silver/engine.h"
#include
"esm/loader.h"
#include
"gc/roots.h"
#include
"modules/assert.h"
#include
"modules/buffer.h"
#include
"modules/events.h"
#include
"modules/stream.h"
#include
"modules/symbol.h"
#include
"modules/string_decoder.h"
enum
{
STREAM_NATIVE_TAG
=
0x5354524Du
};
// STRM
static
ant_value_t
g_stream_proto
=
0
;
static
ant_value_t
g_stream_ctor
=
0
;
static
ant_value_t
g_readable_proto
=
0
;
static
ant_value_t
g_readable_ctor
=
0
;
static
ant_value_t
g_writable_proto
=
0
;
static
ant_value_t
g_writable_ctor
=
0
;
static
ant_value_t
g_duplex_proto
=
0
;
static
ant_value_t
g_duplex_ctor
=
0
;
static
ant_value_t
g_transform_proto
=
0
;
static
ant_value_t
g_transform_ctor
=
0
;
static
ant_value_t
g_passthrough_proto
=
0
;
static
ant_value_t
g_passthrough_ctor
=
0
;
static
double
g_default_high_water_mark
=
16384.0
;
static
double
g_default_object_high_water_mark
=
16.0
;
static
ant_value_t
stream_noop
(
ant_t
*
js
,
ant_value_t
*
args
,
int
nargs
)
{
return
js_mkundef
();
}
static
bool
stream_is_instance
(
ant_value_t
value
)
{
return
is_object_type
(
value
)
&&
js_check_native_tag
(
value
,
STREAM_NATIVE_TAG
);
}
static
inline
void
stream_set_end_callback
(
ant_t
*
js
,
ant_value_t
stream_obj
,
ant_value_t
callback
)
{
js_set_slot_wb
(
js
,
stream_obj
,
SLOT_AUX
,
callback
);
}
static
stream_private_state_t
*
stream_private_state
(
ant_value_t
stream_obj
)
{
if
(
!
stream_is_instance
(
stream_obj
))
return
NULL
;
return
(
stream_private_state_t
*
)
js_get_native_ptr
(
stream_obj
);
}
static
ant_value_t
stream_require_this
(
ant_t
*
js
,
ant_value_t
value
,
const
char
*
label
)
{
if
(
!
stream_is_instance
(
value
))
return
js_mkerr_typed
(
js
,
JS_ERR_TYPE
,
"Invalid %s"
,
label
);
return
value
;
}
static
ant_value_t
stream_truthy_or_object
(
ant_t
*
js
,
ant_value_t
value
)
{
return
js_truthy
(
js
,
value
)
?
value
:
js_mkobj
(
js
);
}
static
ant_value_t
stream_readable_state
(
ant_t
*
js
,
ant_value_t
stream_obj
)
{
return
js_get
(
js
,
stream_obj
,
"_readableState"
);
}
static
ant_value_t
stream_writable_state
(
ant_t
*
js
,
ant_value_t
stream_obj
)
{
return
js_get
(
js
,
stream_obj
,
"_writableState"
);
}
static
ant_value_t
stream_pipes
(
ant_t
*
js
,
ant_value_t
stream_obj
)
{
return
js_get
(
js
,
stream_obj
,
"_pipes"
);
}
static
bool
stream_key_is_cstr
(
ant_t
*
js
,
ant_value_t
value
,
const
char
*
expected
)
{
size_t
len
=
0
;
const
char
*
s
=
NULL
;
if
(
vtype
(
value
)
!=
T_STR
)
return
false
;
s
=
js_getstr
(
js
,
value
,
&
len
);
return
s
&&
len
==
strlen
(
expected
)
&&
memcmp
(
s
,
expected
,
len
)
==
0
;
}
static
ant_value_t
stream_event_key
(
ant_t
*
js
,
ant_value_t
value
)
{
uint8_t
t
=
vtype
(
value
);
if
(
t
==
T_STR
||
t
==
T_SYMBOL
)
return
value
;
return
js_mkerr
(
js
,
"event must be a string or Symbol"
);
}
void
*
stream_get_attached_state
(
ant_value_t
stream_obj
)
{
stream_private_state_t
*
priv
=
stream_private_state
(
stream_obj
);
return
priv
?
priv
->
attached_state
:
NULL
;
}
void
stream_set_attached_state
(
ant_value_t
stream_obj
,
void
*
state
,
stream_finalize_fn
finalize
)
{
stream_private_state_t
*
priv
=
stream_private_state
(
stream_obj
);
if
(
!
priv
)
return
;
priv
->
attached_state
=
state
;
priv
->
attached_state_finalize
=
finalize
;
}
void
stream_clear_attached_state
(
ant_value_t
stream_obj
)
{
stream_private_state_t
*
priv
=
stream_private_state
(
stream_obj
);
if
(
!
priv
)
return
;
priv
->
attached_state
=
NULL
;
priv
->
attached_state_finalize
=
NULL
;
}
static
void
stream_finalize
(
ant_t
*
js
,
ant_object_t
*
obj
)
{
ant_value_t
stream_obj
=
js_obj_from_ptr
(
obj
);
stream_private_state_t
*
priv
=
stream_private_state
(
stream_obj
);
if
(
!
priv
)
return
;
js_set_native_ptr
(
stream_obj
,
NULL
);
if
(
priv
->
attached_state
&&
priv
->
attached_state_finalize
)
priv
->
attached_state_finalize
(
js
,
stream_obj
,
priv
->
attached_state
);
free
(
priv
);
}
static
ant_value_t
stream_call
(
ant_t
*
js
,
ant_value_t
fn
,
ant_value_t
this_val
,
ant_value_t
*
args
,
int
nargs
,
bool
is_ctor
)
{
if
(
!
is_callable
(
fn
))
return
js_mkundef
();
if
(
sv_check_c_stack_overflow
(
js
))
return
js_mkerr_typed
(
js
,
JS_ERR_RANGE
|
JS_ERR_NO_STACK
,
"Maximum call stack size exceeded"
);
sv_call_mode_t
mode
=
is_ctor
?
SV_CALL_MODE_CONSTRUCT
:
SV_CALL_MODE_NORMAL
;
sv_call_plan_t
plan
;
ant_value_t
err
=
sv_prepare_call
(
js
->
vm
,
js
,
fn
,
this_val
,
args
,
nargs
,
NULL
,
mode
,
&
plan
);
if
(
is_err
(
err
))
return
err
;
return
sv_execute_call_plan
(
js
->
vm
,
js
,
&
plan
,
NULL
);
}
static
ant_value_t
stream_call_prop
(
ant_t
*
js
,
ant_value_t
target
,
const
char
*
name
,
ant_value_t
*
args
,
int
nargs
)
{
ant_value_t
fn
=
js_getprop_fallback
(
js
,
target
,
name
);
if
(
is_err
(
fn
)
||
!
is_callable
(
fn
))
return
js_mkundef
();
return
stream_call
(
js
,
fn
,
target
,
args
,
nargs
,
false
);
}
static
void
stream_call_callback
(
ant_t
*
js
,
ant_value_t
fn
,
ant_value_t
*
args
,
int
nargs
)
{
if
(
!
is_callable
(
fn
))
return
;
stream_call
(
js
,
fn
,
js_mkundef
(),
args
,
nargs
,
false
);
}
static
void
stream_schedule_microtask
(
ant_t
*
js
,
ant_cfunc_t
fn
,
ant_value_t
data
)
{
ant_value_t
promise
=
js_mkpromise
(
js
);
ant_value_t
cb
=
js_heavy_mkfun
(
js
,
fn
,
data
);
ant_value_t
then_result
=
0
;
js_resolve_promise
(
js
,
promise
,
js_mkundef
());
then_result
=
js_promise_then
(
js
,
promise
,
cb
,
js_mkundef
());
promise_mark_handled
(
then_result
);
}
static
ant_value_t
stream_buffer_ctor
(
ant_t
*
js
)
{
ant_value_t
ns
=
js_esm_import_sync_cstr
(
js
,
"buffer"
,
6
);
if
(
is_err
(
ns
))
return
ns
;
return
js_get
(
js
,
ns
,
"Buffer"
);
}
static
ant_value_t
stream_readable_decoder
(
ant_t
*
js
,
ant_value_t
stream_obj
)
{
ant_value_t
state
=
stream_readable_state
(
js
,
stream_obj
);
if
(
!
is_object_type
(
state
))
return
js_mkundef
();
return
js_get
(
js
,
state
,
"decoder"
);
}
static
ant_value_t
stream_readable_decode_chunk
(
ant_t
*
js
,
ant_value_t
stream_obj
,
ant_value_t
chunk
,
bool
flush
)
{
ant_value_t
decoder
=
stream_readable_decoder
(
js
,
stream_obj
);
if
(
!
is_object_type
(
decoder
))
return
chunk
;
return
string_decoder_decode_value
(
js
,
decoder
,
chunk
,
flush
);
}
static
bool
stream_value_is_empty_string
(
ant_t
*
js
,
ant_value_t
value
)
{
size_t
len
=
0
;
if
(
vtype
(
value
)
!=
T_STR
)
return
false
;
(
void
)
js_getstr
(
js
,
value
,
&
len
);
return
len
==
0
;
}
static
ant_value_t
stream_make_buffer
(
ant_t
*
js
,
ant_value_t
value
,
ant_value_t
encoding
)
{
ant_value_t
buffer_ctor
=
stream_buffer_ctor
(
js
);
ant_value_t
from_fn
=
0
;
ant_value_t
args
[
2
];
if
(
is_err
(
buffer_ctor
))
return
buffer_ctor
;
from_fn
=
js_get
(
js
,
buffer_ctor
,
"from"
);
if
(
is_err
(
from_fn
)
||
!
is_callable
(
from_fn
))
return
js_mkerr
(
js
,
"Buffer.from is not available"
);
args
[
0
]
=
value
;
args
[
1
]
=
encoding
;
return
stream_call
(
js
,
from_fn
,
buffer_ctor
,
args
,
2
,
false
);
}
static
ant_value_t
stream_normalize_chunk
(
ant_t
*
js
,
ant_value_t
chunk
,
bool
object_mode
,
ant_value_t
encoding
)
{
ant_value_t
str_val
=
0
;
if
(
object_mode
||
is_null
(
chunk
)
||
is_undefined
(
chunk
)
||
vtype
(
chunk
)
==
T_TYPEDARRAY
||
buffer_is_binary_source
(
chunk
)
)
return
chunk
;
if
(
vtype
(
chunk
)
==
T_STR
)
return
stream_make_buffer
(
js
,
chunk
,
encoding
);
str_val
=
js_tostring_val
(
js
,
chunk
);
if
(
is_err
(
str_val
))
return
str_val
;
return
stream_make_buffer
(
js
,
str_val
,
encoding
);
}
static
ant_value_t
stream_readable_buffer
(
ant_t
*
js
,
ant_value_t
stream_obj
)
{
ant_value_t
state
=
stream_readable_state
(
js
,
stream_obj
);
if
(
!
is_object_type
(
state
))
return
js_mkundef
();
return
js_get
(
js
,
state
,
"buffer"
);
}
static
ant_offset_t
stream_readable_buffer_head
(
ant_t
*
js
,
ant_value_t
stream_obj
)
{
ant_value_t
state
=
stream_readable_state
(
js
,
stream_obj
);
ant_value_t
head
=
is_object_type
(
state
)
?
js_get
(
js
,
state
,
"bufferHead"
)
:
js_mkundef
();
return
vtype
(
head
)
==
T_NUM
?
(
ant_offset_t
)
js_getnum
(
head
)
:
0
;
}
static
void
stream_set_readable_buffer_head
(
ant_t
*
js
,
ant_value_t
stream_obj
,
ant_offset_t
head
)
{
ant_value_t
state
=
stream_readable_state
(
js
,
stream_obj
);
if
(
is_object_type
(
state
))
js_set
(
js
,
state
,
"bufferHead"
,
js_mknum
((
double
)
head
));
}
static
ant_offset_t
stream_readable_buffer_len
(
ant_t
*
js
,
ant_value_t
stream_obj
)
{
ant_value_t
buffer
=
stream_readable_buffer
(
js
,
stream_obj
);
ant_offset_t
head
=
stream_readable_buffer_head
(
js
,
stream_obj
);
ant_offset_t
len
=
vtype
(
buffer
)
==
T_ARR
?
js_arr_len
(
js
,
buffer
)
:
0
;
return
len
>
head
?
len
-
head
:
0
;
}
static
void
stream_compact_readable_buffer
(
ant_t
*
js
,
ant_value_t
stream_obj
)
{
ant_value_t
state
=
stream_readable_state
(
js
,
stream_obj
);
ant_value_t
buffer
=
stream_readable_buffer
(
js
,
stream_obj
);
ant_offset_t
head
=
stream_readable_buffer_head
(
js
,
stream_obj
);
ant_offset_t
len
=
vtype
(
buffer
)
==
T_ARR
?
js_arr_len
(
js
,
buffer
)
:
0
;
ant_value_t
compact
=
0
;
if
(
!
is_object_type
(
state
)
||
vtype
(
buffer
)
!=
T_ARR
)
return
;
if
(
head
==
0
)
return
;
if
(
head
>=
len
)
{
compact
=
js_mkarr
(
js
);
js_set
(
js
,
state
,
"buffer"
,
compact
);
js_set
(
js
,
state
,
"bufferHead"
,
js_mknum
(
0
));
return
;
}
if
(
head
<=
32
&&
head
*
2
<
len
)
return
;
compact
=
js_mkarr
(
js
);
for
(
ant_offset_t
i
=
head
;
i
<
len
;
i
++
)
js_arr_push
(
js
,
compact
,
js_arr_get
(
js
,
buffer
,
i
));
js_set
(
js
,
state
,
"buffer"
,
compact
);
js_set
(
js
,
state
,
"bufferHead"
,
js_mknum
(
0
));
}
static
void
stream_buffer_push
(
ant_t
*
js
,
ant_value_t
stream_obj
,
ant_value_t
value
)
{
ant_value_t
state
=
stream_readable_state
(
js
,
stream_obj
);
ant_value_t
buffer
=
stream_readable_buffer
(
js
,
stream_obj
);
if
(
!
is_object_type
(
state
)
||
vtype
(
buffer
)
!=
T_ARR
)
return
;
js_arr_push
(
js
,
buffer
,
value
);
}
static
ant_value_t
stream_buffer_shift
(
ant_t
*
js
,
ant_value_t
stream_obj
)
{
ant_value_t
buffer
=
stream_readable_buffer
(
js
,
stream_obj
);
ant_offset_t
head
=
stream_readable_buffer_head
(
js
,
stream_obj
);
ant_offset_t
len
=
vtype
(
buffer
)
==
T_ARR
?
js_arr_len
(
js
,
buffer
)
:
0
;
ant_value_t
value
=
js_mkundef
();
if
(
vtype
(
buffer
)
!=
T_ARR
||
head
>=
len
)
return
js_mkundef
();
value
=
js_arr_get
(
js
,
buffer
,
head
);
stream_set_readable_buffer_head
(
js
,
stream_obj
,
head
+
1
);
stream_compact_readable_buffer
(
js
,
stream_obj
);
return
value
;
}
static
bool
stream_listener_count_positive
(
ant_t
*
js
,
ant_value_t
target
,
const
char
*
event_name
)
{
ant_value_t
args
[
1
];
ant_value_t
result
=
0
;
args
[
0
]
=
js_mkstr
(
js
,
event_name
,
strlen
(
event_name
));
result
=
stream_call_prop
(
js
,
target
,
"listenerCount"
,
args
,
1
);
return
vtype
(
result
)
==
T_NUM
&&
js_getnum
(
result
)
>
0
;
}
static
void
stream_remove_listener
(
ant_t
*
js
,
ant_value_t
target
,
const
char
*
event_name
,
ant_value_t
listener
)
{
ant_value_t
args
[
2
];
args
[
0
]
=
js_mkstr
(
js
,
event_name
,
strlen
(
event_name
));
args
[
1
]
=
listener
;
stream_call_prop
(
js
,
target
,
"removeListener"
,
args
,
2
);
}
static
ant_value_t
stream_get_option
(
ant_t
*
js
,
ant_value_t
options
,
const
char
*
name
)
{
if
(
!
is_object_type
(
options
))
return
js_mkundef
();
return
js_get
(
js
,
options
,
name
);
}
static
double
stream_default_high_water_mark
(
bool
object_mode
)
{
return
object_mode
?
g_default_object_high_water_mark
:
g_default_high_water_mark
;
}
static
double
stream_high_water_mark_from_options
(
ant_t
*
js
,
ant_value_t
options
,
bool
object_mode
)
{
ant_value_t
hwm
=
stream_get_option
(
js
,
options
,
"highWaterMark"
);
return
(
vtype
(
hwm
)
==
T_NUM
&&
js_getnum
(
hwm
)
>
0
)
?
js_getnum
(
hwm
)
:
stream_default_high_water_mark
(
object_mode
);
}
static
ant_value_t
stream_make_base_object
(
ant_t
*
js
,
ant_value_t
proto
)
{
ant_value_t
obj
=
js_mkobj
(
js
);
stream_private_state_t
*
priv
=
calloc
(
1
,
sizeof
(
*
priv
));
if
(
is_object_type
(
proto
))
js_set_proto_init
(
obj
,
proto
);
js_set_native_tag
(
obj
,
STREAM_NATIVE_TAG
);
if
(
priv
)
js_set_native_ptr
(
obj
,
priv
);
js_set_slot
(
obj
,
SLOT_AUX
,
js_mkundef
());
js_set_finalizer
(
obj
,
stream_finalize
);
return
obj
;
}
static
void
stream_init_base
(
ant_t
*
js
,
ant_value_t
obj
,
ant_value_t
raw_options
)
{
ant_value_t
pipes
=
js_mkarr
(
js
);
js_set
(
js
,
obj
,
"readable"
,
js_true
);
js_set
(
js
,
obj
,
"writable"
,
js_true
);
js_set
(
js
,
obj
,
"destroyed"
,
js_false
);
js_set
(
js
,
obj
,
"_paused"
,
js_false
);
js_set
(
js
,
obj
,
"_pipes"
,
pipes
);
js_set
(
js
,
obj
,
"_streamOptions"
,
stream_truthy_or_object
(
js
,
raw_options
));
}
static
void
stream_init_readable
(
ant_t
*
js
,
ant_value_t
obj
,
ant_value_t
raw_options
)
{
ant_value_t
options
=
is_object_type
(
raw_options
)
?
raw_options
:
js_mkobj
(
js
);
ant_value_t
state
=
js_mkobj
(
js
);
ant_value_t
read_fn
=
stream_get_option
(
js
,
options
,
"read"
);
bool
object_mode
=
js_truthy
(
js
,
stream_get_option
(
js
,
options
,
"objectMode"
));
double
high_water_mark
=
stream_high_water_mark_from_options
(
js
,
options
,
object_mode
);
stream_init_base
(
js
,
obj
,
raw_options
);
js_set
(
js
,
obj
,
"readable"
,
js_true
);
js_set
(
js
,
obj
,
"writable"
,
js_false
);
js_set
(
js
,
obj
,
"readableEnded"
,
js_false
);
js_set
(
js
,
state
,
"objectMode"
,
js_bool
(
object_mode
));
js_set
(
js
,
state
,
"ended"
,
js_false
);
js_set
(
js
,
state
,
"endEmitted"
,
js_false
);
js_set
(
js
,
state
,
"flowing"
,
js_false
);
js_set
(
js
,
state
,
"flowingReadScheduled"
,
js_false
);
js_set
(
js
,
state
,
"reading"
,
js_false
);
js_set
(
js
,
state
,
"highWaterMark"
,
js_mknum
(
high_water_mark
));
js_set
(
js
,
state
,
"buffer"
,
js_mkarr
(
js
));
js_set
(
js
,
state
,
"bufferHead"
,
js_mknum
(
0
));
js_set
(
js
,
obj
,
"_readableState"
,
state
);
if
(
is_callable
(
read_fn
))
js_set
(
js
,
obj
,
"_read"
,
read_fn
);
}
static
void
stream_init_writable
(
ant_t
*
js
,
ant_value_t
obj
,
ant_value_t
raw_options
)
{
ant_value_t
options
=
is_object_type
(
raw_options
)
?
raw_options
:
js_mkobj
(
js
);
ant_value_t
state
=
js_mkobj
(
js
);
bool
object_mode
=
js_truthy
(
js
,
stream_get_option
(
js
,
options
,
"objectMode"
))
||
js_truthy
(
js
,
stream_get_option
(
js
,
options
,
"writableObjectMode"
));
ant_value_t
write_fn
=
stream_get_option
(
js
,
options
,
"write"
);
ant_value_t
final_fn
=
stream_get_option
(
js
,
options
,
"final"
);
ant_value_t
destroy_fn
=
stream_get_option
(
js
,
options
,
"destroy"
);
stream_init_base
(
js
,
obj
,
raw_options
);
js_set
(
js
,
obj
,
"readable"
,
js_false
);
js_set
(
js
,
obj
,
"writable"
,
js_true
);
js_set
(
js
,
obj
,
"writableEnded"
,
js_false
);
js_set
(
js
,
obj
,
"writableFinished"
,
js_false
);
js_set
(
js
,
state
,
"objectMode"
,
js_bool
(
object_mode
));
js_set
(
js
,
state
,
"finished"
,
js_false
);
js_set
(
js
,
state
,
"ended"
,
js_false
);
js_set
(
js
,
obj
,
"_writableState"
,
state
);
if
(
is_callable
(
write_fn
))
js_set
(
js
,
obj
,
"_write"
,
write_fn
);
if
(
is_callable
(
final_fn
))
js_set
(
js
,
obj
,
"_final"
,
final_fn
);
if
(
is_callable
(
destroy_fn
))
js_set
(
js
,
obj
,
"_destroy"
,
destroy_fn
);
}
static
ant_value_t
stream_construct
(
ant_t
*
js
,
ant_value_t
base_proto
,
ant_value_t
raw_options
,
void
(
*
init_fn
)(
ant_t
*
,
ant_value_t
,
ant_value_t
)
)
{
ant_value_t
proto
=
js_instance_proto_from_new_target
(
js
,
base_proto
);
ant_value_t
obj
=
stream_make_base_object
(
js
,
is_object_type
(
proto
)
?
proto
:
base_proto
);
init_fn
(
js
,
obj
,
raw_options
);
return
obj
;
}
static
ant_value_t
stream_emit_named
(
ant_t
*
js
,
ant_value_t
stream_obj
,
const
char
*
event_name
)
{
return
js_bool
(
eventemitter_emit_args
(
js
,
stream_obj
,
event_name
,
NULL
,
0
));
}
static
void
stream_emit_error
(
ant_t
*
js
,
ant_value_t
stream_obj
,
ant_value_t
error
)
{
ant_value_t
args
[
1
];
args
[
0
]
=
error
;
eventemitter_emit_args
(
js
,
stream_obj
,
"error"
,
args
,
1
);
}
static
void
stream_readable_schedule_continue_flowing
(
ant_t
*
js
,
ant_value_t
stream_obj
)
{
ant_value_t
state
=
stream_readable_state
(
js
,
stream_obj
);
if
(
!
is_object_type
(
state
))
return
;
if
(
!
js_truthy
(
js
,
js_get
(
js
,
state
,
"flowing"
)))
return
;
if
(
js_truthy
(
js
,
js_get
(
js
,
stream_obj
,
"destroyed"
)))
return
;
if
(
js_truthy
(
js
,
js_get
(
js
,
state
,
"ended"
)))
return
;
if
(
stream_readable_buffer_len
(
js
,
stream_obj
)
>
0
)
return
;
if
(
js_truthy
(
js
,
js_get
(
js
,
state
,
"flowingReadScheduled"
)))
return
;
js_set
(
js
,
state
,
"flowingReadScheduled"
,
js_true
);
stream_schedule_microtask
(
js
,
stream_readable_continue_flowing
,
stream_obj
);
}
static
ant_value_t
js_stream_pause
(
ant_t
*
js
,
ant_value_t
*
args
,
int
nargs
)
{
ant_value_t
stream_obj
=
stream_require_this
(
js
,
js_getthis
(
js
),
"stream"
);
if
(
is_err
(
stream_obj
))
return
stream_obj
;
js_set
(
js
,
stream_obj
,
"_paused"
,
js_true
);
stream_emit_named
(
js
,
stream_obj
,
"pause"
);
return
stream_obj
;
}
static
ant_value_t
js_stream_resume
(
ant_t
*
js
,
ant_value_t
*
args
,
int
nargs
)
{
ant_value_t
stream_obj
=
stream_require_this
(
js
,
js_getthis
(
js
),
"stream"
);
if
(
is_err
(
stream_obj
))
return
stream_obj
;
js_set
(
js
,
stream_obj
,
"_paused"
,
js_false
);
stream_emit_named
(
js
,
stream_obj
,
"resume"
);
return
stream_obj
;
}
static
ant_value_t
js_stream_is_paused
(
ant_t
*
js
,
ant_value_t
*
args
,
int
nargs
)
{
ant_value_t
stream_obj
=
stream_require_this
(
js
,
js_getthis
(
js
),
"stream"
);
ant_value_t
paused
=
0
;
if
(
is_err
(
stream_obj
))
return
stream_obj
;
paused
=
js_get
(
js
,
stream_obj
,
"_paused"
);
return
js_bool
(
js_truthy
(
js
,
paused
));
}
static
void
stream_pipe_remove_state
(
ant_t
*
js
,
ant_value_t
source
,
ant_value_t
state_obj
)
{
ant_value_t
pipes
=
stream_pipes
(
js
,
source
);
ant_offset_t
len
=
vtype
(
pipes
)
==
T_ARR
?
js_arr_len
(
js
,
pipes
)
:
0
;
ant_value_t
next
=
js_mkarr
(
js
);
for
(
ant_offset_t
i
=
0
;
i
<
len
;
i
++
)
{
ant_value_t
item
=
js_arr_get
(
js
,
pipes
,
i
);
if
(
item
!=
state_obj
)
js_arr_push
(
js
,
next
,
item
);
}
js_set
(
js
,
source
,
"_pipes"
,
next
);
}
static
void
stream_pipe_cleanup
(
ant_t
*
js
,
ant_value_t
state_obj
)
{
ant_value_t
cleaned
=
js_get
(
js
,
state_obj
,
"cleaned"
);
ant_value_t
source
=
js_get
(
js
,
state_obj
,
"source"
);
ant_value_t
dest
=
js_get
(
js
,
state_obj
,
"dest"
);
ant_value_t
on_data
=
js_get
(
js
,
state_obj
,
"onData"
);
ant_value_t
on_drain
=
js_get
(
js
,
state_obj
,
"onDrain"
);
ant_value_t
on_end
=
js_get
(
js
,
state_obj
,
"onEnd"
);
ant_value_t
on_close
=
js_get
(
js
,
state_obj
,
"onClose"
);
ant_value_t
on_error
=
js_get
(
js
,
state_obj
,
"onError"
);
if
(
js_truthy
(
js
,
cleaned
))
return
;
js_set
(
js
,
state_obj
,
"cleaned"
,
js_true
);
if
(
stream_is_instance
(
source
))
{
stream_remove_listener
(
js
,
source
,
"data"
,
on_data
);
stream_remove_listener
(
js
,
source
,
"end"
,
on_end
);
stream_remove_listener
(
js
,
source
,
"close"
,
on_close
);
stream_remove_listener
(
js
,
source
,
"error"
,
on_error
);
stream_pipe_remove_state
(
js
,
source
,
state_obj
);
}
if
(
is_object_type
(
dest
))
stream_remove_listener
(
js
,
dest
,
"drain"
,
on_drain
);
}
static
ant_value_t
stream_pipe_on_data
(
ant_t
*
js
,
ant_value_t
*
args
,
int
nargs
)
{
ant_value_t
state_obj
=
js_get_slot
(
js_getcurrentfunc
(
js
),
SLOT_DATA
);
ant_value_t
source
=
js_get
(
js
,
state_obj
,
"source"
);
ant_value_t
dest
=
js_get
(
js
,
state_obj
,
"dest"
);
ant_value_t
result
=
js_mkundef
();
if
(
!
is_object_type
(
dest
))
return
js_mkundef
();
result
=
stream_call_prop
(
js
,
dest
,
"write"
,
nargs
>
0
?
&
args
[
0
]
:
NULL
,
nargs
>
0
?
1
:
0
);
if
(
is_err
(
result
))
return
result
;
if
(
result
==
js_false
)
stream_call_prop
(
js
,
source
,
"pause"
,
NULL
,
0
);
return
js_mkundef
();
}
static
ant_value_t
stream_pipe_on_drain
(
ant_t
*
js
,
ant_value_t
*
args
,
int
nargs
)
{
ant_value_t
state_obj
=
js_get_slot
(
js_getcurrentfunc
(
js
),
SLOT_DATA
);
ant_value_t
source
=
js_get
(
js
,
state_obj
,
"source"
);
stream_call_prop
(
js
,
source
,
"resume"
,
NULL
,
0
);
return
js_mkundef
();
}
static
ant_value_t
stream_pipe_on_end
(
ant_t
*
js
,
ant_value_t
*
args
,
int
nargs
)
{
ant_value_t
state_obj
=
js_get_slot
(
js_getcurrentfunc
(
js
),
SLOT_DATA
);
ant_value_t
dest
=
js_get
(
js
,
state_obj
,
"dest"
);
bool
end_dest
=
js_truthy
(
js
,
js_get
(
js
,
state_obj
,
"end"
));
stream_pipe_cleanup
(
js
,
state_obj
);
if
(
end_dest
)
stream_call_prop
(
js
,
dest
,
"end"
,
NULL
,
0
);
return
js_mkundef
();
}
static
ant_value_t
stream_pipe_on_close
(
ant_t
*
js
,
ant_value_t
*
args
,
int
nargs
)
{
ant_value_t
state_obj
=
js_get_slot
(
js_getcurrentfunc
(
js
),
SLOT_DATA
);
stream_pipe_cleanup
(
js
,
state_obj
);
return
js_mkundef
();
}
static
ant_value_t
stream_pipe_on_error
(
ant_t
*
js
,
ant_value_t
*
args
,
int
nargs
)
{
ant_value_t
state_obj
=
js_get_slot
(
js_getcurrentfunc
(
js
),
SLOT_DATA
);
ant_value_t
dest
=
js_get
(
js
,
state_obj
,
"dest"
);
stream_pipe_cleanup
(
js
,
state_obj
);
if
(
is_object_type
(
dest
)
&&
stream_listener_count_positive
(
js
,
dest
,
"error"
)
&&
nargs
>
0
)
eventemitter_emit_args
(
js
,
dest
,
"error"
,
&
args
[
0
],
1
);
return
js_mkundef
();
}
static
ant_value_t
js_stream_pipe
(
ant_t
*
js
,
ant_value_t
*
args
,
int
nargs
)
{
ant_value_t
source
=
stream_require_this
(
js
,
js_getthis
(
js
),
"stream"
);
ant_value_t
options
=
nargs
>
1
?
args
[
1
]
:
js_mkundef
();
ant_value_t
state_obj
=
0
;
ant_value_t
readable_state
=
0
;
bool
end_dest
=
true
;
if
(
is_err
(
source
))
return
source
;
if
(
nargs
<
1
||
!
is_object_type
(
args
[
0
]))
return
js_mkerr
(
js
,
"pipe requires a destination stream"
);
if
(
is_object_type
(
options
))
{
ant_value_t
end_val
=
js_get
(
js
,
options
,
"end"
);
if
(
!
is_undefined
(
end_val
))
end_dest
=
end_val
!=
js_false
;
}
state_obj
=
js_mkobj
(
js
);
js_set
(
js
,
state_obj
,
"source"
,
source
);
js_set
(
js
,
state_obj
,
"dest"
,
args
[
0
]);
js_set
(
js
,
state_obj
,
"end"
,
js_bool
(
end_dest
));
js_set
(
js
,
state_obj
,
"cleaned"
,
js_false
);
js_set
(
js
,
state_obj
,
"onData"
,
js_heavy_mkfun
(
js
,
stream_pipe_on_data
,
state_obj
));
js_set
(
js
,
state_obj
,
"onDrain"
,
js_heavy_mkfun
(
js
,
stream_pipe_on_drain
,
state_obj
));
js_set
(
js
,
state_obj
,
"onEnd"
,
js_heavy_mkfun
(
js
,
stream_pipe_on_end
,
state_obj
));
js_set
(
js
,
state_obj
,
"onClose"
,
js_heavy_mkfun
(
js
,
stream_pipe_on_close
,
state_obj
));
js_set
(
js
,
state_obj
,
"onError"
,
js_heavy_mkfun
(
js
,
stream_pipe_on_error
,
state_obj
));
js_arr_push
(
js
,
stream_pipes
(
js
,
source
),
state_obj
);
eventemitter_add_listener
(
js
,
source
,
"data"
,
js_get
(
js
,
state_obj
,
"onData"
),
false
);
eventemitter_add_listener
(
js
,
source
,
"end"
,
js_get
(
js
,
state_obj
,
"onEnd"
),
true
);
eventemitter_add_listener
(
js
,
source
,
"close"
,
js_get
(
js
,
state_obj
,
"onClose"
),
true
);
eventemitter_add_listener
(
js
,
source
,
"error"
,
js_get
(
js
,
state_obj
,
"onError"
),
false
);
eventemitter_add_listener
(
js
,
args
[
0
],
"drain"
,
js_get
(
js
,
state_obj
,
"onDrain"
),
false
);
eventemitter_emit_args
(
js
,
args
[
0
],
"pipe"
,
&
source
,
1
);
readable_state
=
stream_readable_state
(
js
,
source
);
if
(
is_object_type
(
readable_state
))
js_set
(
js
,
readable_state
,
"flowing"
,
js_true
);
stream_call_prop
(
js
,
source
,
"resume"
,
NULL
,
0
);
return
args
[
0
];
}
static
ant_value_t
js_stream_unpipe
(
ant_t
*
js
,
ant_value_t
*
args
,
int
nargs
)
{
ant_value_t
source
=
stream_require_this
(
js
,
js_getthis
(
js
),
"stream"
);
ant_value_t
pipes
=
0
;
ant_value_t
matches
=
0
;
ant_offset_t
len
=
0
;
ant_value_t
dest
=
nargs
>
0
?
args
[
0
]
:
js_mkundef
();
if
(
is_err
(
source
))
return
source
;
pipes
=
stream_pipes
(
js
,
source
);
if
(
vtype
(
pipes
)
!=
T_ARR
)
return
source
;
matches
=
js_mkarr
(
js
);
len
=
js_arr_len
(
js
,
pipes
);
for
(
ant_offset_t
i
=
0
;
i
<
len
;
i
++
)
{
ant_value_t
state_obj
=
js_arr_get
(
js
,
pipes
,
i
);
ant_value_t
entry_dest
=
js_get
(
js
,
state_obj
,
"dest"
);
if
(
!
is_object_type
(
dest
)
||
entry_dest
==
dest
)
js_arr_push
(
js
,
matches
,
state_obj
);
}
len
=
js_arr_len
(
js
,
matches
);
for
(
ant_offset_t
i
=
0
;
i
<
len
;
i
++
)
stream_pipe_cleanup
(
js
,
js_arr_get
(
js
,
matches
,
i
));
return
source
;
}
static
ant_value_t
stream_destroy_done
(
ant_t
*
js
,
ant_value_t
*
args
,
int
nargs
)
{
ant_value_t
state_obj
=
js_get_slot
(
js_getcurrentfunc
(
js
),
SLOT_DATA
);
ant_value_t
stream_obj
=
js_get
(
js
,
state_obj
,
"stream"
);
ant_value_t
destroyed_err
=
(
nargs
>
0
)
?
args
[
0
]
:
js_mkundef
();
if
(
!
is_null
(
destroyed_err
)
&&
!
is_undefined
(
destroyed_err
))
stream_emit_error
(
js
,
stream_obj
,
destroyed_err
);
stream_emit_named
(
js
,
stream_obj
,
"close"
);
return
js_mkundef
();
}
static
ant_value_t
stream_once_call
(
ant_t
*
js
,
ant_value_t
*
args
,
int
nargs
)
{
ant_value_t
state_obj
=
js_get_slot
(
js_getcurrentfunc
(
js
),
SLOT_DATA
);
ant_value_t
fn
=
js_get
(
js
,
state_obj
,
"fn"
);
ant_value_t
this_val
=
js_get
(
js
,
state_obj
,
"thisVal"
);
ant_value_t
called
=
js_get
(
js
,
state_obj
,
"called"
);
if
(
js_truthy
(
js
,
called
))
return
js_mkundef
();
js_set
(
js
,
state_obj
,
"called"
,
js_true
);
return
stream_call
(
js
,
fn
,
this_val
,
args
,
nargs
,
false
);
}
static
ant_value_t
stream_make_once
(
ant_t
*
js
,
ant_value_t
fn
,
ant_value_t
this_val
)
{
ant_value_t
state_obj
=
js_mkobj
(
js
);
js_set
(
js
,
state_obj
,
"fn"
,
fn
);
js_set
(
js
,
state_obj
,
"thisVal"
,
this_val
);
js_set
(
js
,
state_obj
,
"called"
,
js_false
);
return
js_heavy_mkfun
(
js
,
stream_once_call
,
state_obj
);
}
static
ant_value_t
js_stream_destroy
(
ant_t
*
js
,
ant_value_t
*
args
,
int
nargs
)
{
ant_value_t
stream_obj
=
stream_require_this
(
js
,
js_getthis
(
js
),
"stream"
);
ant_value_t
destroy_fn
=
0
;
ant_value_t
done_state
=
0
;
ant_value_t
done
=
0
;
ant_value_t
destroy_args
[
2
];
ant_value_t
error
=
nargs
>
0
?
args
[
0
]
:
js_mkundef
();
ant_value_t
result
=
0
;
if
(
is_err
(
stream_obj
))
return
stream_obj
;
if
(
js_truthy
(
js
,
js_get
(
js
,
stream_obj
,
"destroyed"
)))
return
stream_obj
;
js_set
(
js
,
stream_obj
,
"destroyed"
,
js_true
);
done_state
=
js_mkobj
(
js
);
js_set
(
js
,
done_state
,
"stream"
,
stream_obj
);
done
=
stream_make_once
(
js
,
js_heavy_mkfun
(
js
,
stream_destroy_done
,
done_state
),
js_mkundef
());
destroy_fn
=
js_getprop_fallback
(
js
,
stream_obj
,
"_destroy"
);
if
(
is_callable
(
destroy_fn
))
{
destroy_args
[
0
]
=
is_undefined
(
error
)
?
js_mknull
()
:
error
;
destroy_args
[
1
]
=
done
;
result
=
stream_call
(
js
,
destroy_fn
,
stream_obj
,
destroy_args
,
2
,
false
);
return
is_err
(
result
)
?
result
:
stream_obj
;
}
destroy_args
[
0
]
=
is_undefined
(
error
)
?
js_mknull
()
:
error
;
stream_call_callback
(
js
,
done
,
destroy_args
,
1
);
return
stream_obj
;
}
static
ant_value_t
js_readable__read
(
ant_t
*
js
,
ant_value_t
*
args
,
int
nargs
)
{
return
js_mkundef
();
}
static
ant_value_t
stream_readable_start_flowing
(
ant_t
*
js
,
ant_value_t
*
args
,
int
nargs
)
{
ant_value_t
stream_obj
=
js_get_slot
(
js_getcurrentfunc
(
js
),
SLOT_DATA
);
return
stream_readable_begin_flowing
(
js
,
stream_obj
);
}
ant_value_t
stream_readable_continue_flowing
(
ant_t
*
js
,
ant_value_t
*
args
,
int
nargs
)
{
ant_value_t
stream_obj
=
js_get_slot
(
js_getcurrentfunc
(
js
),
SLOT_DATA
);
ant_value_t
state
=
stream_readable_state
(
js
,
stream_obj
);
if
(
!
is_object_type
(
state
))
return
js_mkundef
();
js_set
(
js
,
state
,
"flowingReadScheduled"
,
js_false
);
if
(
!
js_truthy
(
js
,
js_get
(
js
,
state
,
"flowing"
)))
return
js_mkundef
();
if
(
js_truthy
(
js
,
js_get
(
js
,
stream_obj
,
"destroyed"
)))
return
js_mkundef
();
if
(
js_truthy
(
js
,
js_get
(
js
,
state
,
"ended"
)))
return
js_mkundef
();
stream_readable_maybe_read
(
js
,
stream_obj
);
stream_readable_flush
(
js
,
stream_obj
);
return
js_mkundef
();
}
ant_value_t
stream_readable_begin_flowing
(
ant_t
*
js
,
ant_value_t
stream_obj
)
{
ant_value_t
state
=
stream_readable_state
(
js
,
stream_obj
);
if
(
!
is_object_type
(
state
))
return
js_mkundef
();
if
(
!
js_truthy
(
js
,
js_get
(
js
,
state
,
"flowing"
)))
return
js_mkundef
();
{
ant_value_t
saved_this
=
js
->
this_val
;
js
->
this_val
=
stream_obj
;
js_stream_resume
(
js
,
NULL
,
0
);
js
->
this_val
=
saved_this
;
}
stream_readable_maybe_read
(
js
,
stream_obj
);
stream_readable_flush
(
js
,
stream_obj
);
return
js_mkundef
();
}
ant_value_t
stream_readable_flush
(
ant_t
*
js
,
ant_value_t
stream_obj
)
{
ant_value_t
state
=
stream_readable_state
(
js
,
stream_obj
);
bool
emitted_data
=
false
;
if
(
!
is_object_type
(
state
))
return
js_mkundef
();
while
(
js_truthy
(
js
,
js_get
(
js
,
state
,
"flowing"
))
&&
stream_readable_buffer_len
(
js
,
stream_obj
)
>
0
)
{
ant_value_t
chunk
=
stream_buffer_shift
(
js
,
stream_obj
);
chunk
=
stream_readable_decode_chunk
(
js
,
stream_obj
,
chunk
,
false
);
if
(
is_err
(
chunk
))
return
chunk
;
emitted_data
=
true
;
eventemitter_emit_args
(
js
,
stream_obj
,
"data"
,
&
chunk
,
1
);
}
if
(
js_truthy
(
js
,
js_get
(
js
,
state
,
"ended"
))
&&
stream_readable_buffer_len
(
js
,
stream_obj
)
==
0
&&
!
js_truthy
(
js
,
js_get
(
js
,
state
,
"endEmitted"
))
)
{
ant_value_t
tail
=
stream_readable_decode_chunk
(
js
,
stream_obj
,
js_mkundef
(),
true
);
if
(
is_err
(
tail
))
return
tail
;
if
(
!
is_undefined
(
tail
)
&&
!
stream_value_is_empty_string
(
js
,
tail
))
{
emitted_data
=
true
;
eventemitter_emit_args
(
js
,
stream_obj
,
"data"
,
&
tail
,
1
);
}
js_set
(
js
,
state
,
"endEmitted"
,
js_true
);
js_set
(
js
,
stream_obj
,
"readableEnded"
,
js_true
);
stream_emit_named
(
js
,
stream_obj
,
"end"
);
stream_emit_named
(
js
,
stream_obj
,
"close"
);
}
else
if
(
emitted_data
)
stream_readable_schedule_continue_flowing
(
js
,
stream_obj
);
return
js_mkundef
();
}
ant_value_t
stream_readable_maybe_read
(
ant_t
*
js
,
ant_value_t
stream_obj
)
{
ant_value_t
state
=
stream_readable_state
(
js
,
stream_obj
);
ant_value_t
read_fn
=
0
;
ant_value_t
args
[
1
];
if
(
!
is_object_type
(
state
))
return
js_mkundef
();
if
(
js_truthy
(
js
,
js_get
(
js
,
stream_obj
,
"destroyed"
)))
return
js_mkundef
();
if
(
js_truthy
(
js
,
js_get
(
js
,
state
,
"reading"
)))
return
js_mkundef
();
if
(
js_truthy
(
js
,
js_get
(
js
,
state
,
"ended"
)))
return
js_mkundef
();
if
(
stream_readable_buffer_len
(
js
,
stream_obj
)
>
0
)
return
js_mkundef
();
read_fn
=
js_getprop_fallback
(
js
,
stream_obj
,
"_read"
);
js_set
(
js
,
state
,
"reading"
,
js_true
);
args
[
0
]
=
js_get
(
js
,
state
,
"highWaterMark"
);
if
(
is_callable
(
read_fn
))
stream_call
(
js
,
read_fn
,
stream_obj
,
args
,
1
,
false
);
js_set
(
js
,
state
,
"reading"
,
js_false
);
return
js_mkundef
();
}
ant_value_t
stream_readable_push_value
(
ant_t
*
js
,
ant_value_t
stream_obj
,
ant_value_t
chunk
,
ant_value_t
encoding
)
{
ant_value_t
state
=
stream_readable_state
(
js
,
stream_obj
);
ant_value_t
normalized
=
0
;
if
(
!
is_object_type
(
state
))
return
js_false
;
if
(
js_truthy
(
js
,
js_get
(
js
,
stream_obj
,
"destroyed"
)))
return
js_false
;
if
(
is_null
(
chunk
))
{
js_set
(
js
,
state
,
"ended"
,
js_true
);
stream_readable_flush
(
js
,
stream_obj
);
return
js_false
;
}
normalized
=
stream_normalize_chunk
(
js
,
chunk
,
js_truthy
(
js
,
js_get
(
js
,
state
,
"objectMode"
)),
is_undefined
(
encoding
)
?
js_mkstr
(
js
,
"utf8"
,
4
)
:
encoding
);
if
(
is_err
(
normalized
))
return
normalized
;
stream_buffer_push
(
js
,
stream_obj
,
normalized
);
if
(
js_truthy
(
js
,
js_get
(
js
,
state
,
"flowing"
)))
stream_readable_flush
(
js
,
stream_obj
);
return
js_bool
(
js_truthy
(
js
,
js_get
(
js
,
state
,
"flowing"
)));
}
static
ant_value_t
js_readable_push
(
ant_t
*
js
,
ant_value_t
*
args
,
int
nargs
)
{
ant_value_t
stream_obj
=
stream_require_this
(
js
,
js_getthis
(
js
),
"Readable"
);
ant_value_t
chunk
=
nargs
>
0
?
args
[
0
]
:
js_mkundef
();
ant_value_t
encoding
=
nargs
>
1
?
args
[
1
]
:
js_mkundef
();
if
(
is_err
(
stream_obj
))
return
stream_obj
;
return
stream_readable_push_value
(
js
,
stream_obj
,
chunk
,
encoding
);
}
static
ant_value_t
js_readable_read
(
ant_t
*
js
,
ant_value_t
*
args
,
int
nargs
)
{
ant_value_t
stream_obj
=
stream_require_this
(
js
,
js_getthis
(
js
),
"Readable"
);
ant_value_t
state
=
0
;
ant_value_t
chunk
=
0
;
if
(
is_err
(
stream_obj
))
return
stream_obj
;
state
=
stream_readable_state
(
js
,
stream_obj
);
if
(
!
is_object_type
(
state
))
return
js_mknull
();
if
(
stream_readable_buffer_len
(
js
,
stream_obj
)
==
0
)
stream_readable_maybe_read
(
js
,
stream_obj
);
if
(
stream_readable_buffer_len
(
js
,
stream_obj
)
==
0
)
return
js_mknull
();
chunk
=
stream_buffer_shift
(
js
,
stream_obj
);
chunk
=
stream_readable_decode_chunk
(
js
,
stream_obj
,
chunk
,
false
);
if
(
is_err
(
chunk
))
return
chunk
;
if
(
js_truthy
(
js
,
js_get
(
js
,
state
,
"flowing"
)))
stream_readable_flush
(
js
,
stream_obj
);
return
chunk
;
}
static
ant_value_t
js_readable_set_encoding
(
ant_t
*
js
,
ant_value_t
*
args
,
int
nargs
)
{
ant_value_t
stream_obj
=
stream_require_this
(
js
,
js_getthis
(
js
),
"Readable"
);
ant_value_t
state
=
0
;
ant_value_t
decoder
=
0
;
ant_value_t
encoding
=
nargs
>
0
&&
!
is_undefined
(
args
[
0
])
?
args
[
0
]
:
js_mkstr
(
js
,
"utf8"
,
4
);
ant_value_t
encoding_str
=
0
;
if
(
is_err
(
stream_obj
))
return
stream_obj
;
state
=
stream_readable_state
(
js
,
stream_obj
);
if
(
!
is_object_type
(
state
))
return
stream_obj
;
decoder
=
string_decoder_create
(
js
,
encoding
);
if
(
is_err
(
decoder
))
return
decoder
;
encoding_str
=
js_tostring_val
(
js
,
encoding
);
if
(
is_err
(
encoding_str
))
return
encoding_str
;
js_set
(
js
,
state
,
"decoder"
,
decoder
);
js_set
(
js
,
stream_obj
,
"encoding"
,
encoding_str
);
js_set
(
js
,
stream_obj
,
"readableEncoding"
,
encoding_str
);
return
stream_obj
;
}
static
ant_value_t
js_readable_on
(
ant_t
*
js
,
ant_value_t
*
args
,
int
nargs
)
{
ant_value_t
stream_obj
=
stream_require_this
(
js
,
js_getthis
(
js
),
"Readable"
);
ant_value_t
key
=
0
;
ant_value_t
state
=
0
;
if
(
is_err
(
stream_obj
))
return
stream_obj
;
if
(
nargs
<
2
)
return
js_mkerr
(
js
,
"on requires 2 arguments (event, listener)"
);
key
=
stream_event_key
(
js
,
args
[
0
]);
if
(
is_err
(
key
))
return
key
;
if
(
!
eventemitter_add_listener_val
(
js
,
stream_obj
,
key
,
args
[
1
],
false
))
return
js_mkerr
(
js
,
"listener must be a function"
);
if
(
stream_key_is_cstr
(
js
,
key
,
"data"
))
{
state
=
stream_readable_state
(
js
,
stream_obj
);
if
(
is_object_type
(
state
))
js_set
(
js
,
state
,
"flowing"
,
js_true
);
stream_schedule_microtask
(
js
,
stream_readable_start_flowing
,
stream_obj
);
}
return
stream_obj
;
}
static
ant_value_t
js_readable_resume
(
ant_t
*
js
,
ant_value_t
*
args
,
int
nargs
)
{
ant_value_t
stream_obj
=
stream_require_this
(
js
,
js_getthis
(
js
),
"Readable"
);
ant_value_t
state
=
0
;
if
(
is_err
(
stream_obj
))
return
stream_obj
;
state
=
stream_readable_state
(
js
,
stream_obj
);
if
(
is_object_type
(
state
))
js_set
(
js
,
state
,
"flowing"
,
js_true
);
js_stream_resume
(
js
,
NULL
,
0
);
stream_readable_maybe_read
(
js
,
stream_obj
);
stream_readable_flush
(
js
,
stream_obj
);
return
stream_obj
;
}
static
ant_value_t
js_readable_pause
(
ant_t
*
js
,
ant_value_t
*
args
,
int
nargs
)
{
ant_value_t
stream_obj
=
stream_require_this
(
js
,
js_getthis
(
js
),
"Readable"
);
ant_value_t
state
=
0
;
if
(
is_err
(
stream_obj
))
return
stream_obj
;
state
=
stream_readable_state
(
js
,
stream_obj
);
if
(
is_object_type
(
state
))
js_set
(
js
,
state
,
"flowing"
,
js_false
);
return
js_stream_pause
(
js
,
args
,
nargs
);
}
static
ant_value_t
js_writable__write
(
ant_t
*
js
,
ant_value_t
*
args
,
int
nargs
)
{
ant_value_t
callback
=
nargs
>
2
?
args
[
2
]
:
js_mkundef
();
stream_call_callback
(
js
,
callback
,
NULL
,
0
);
return
js_mkundef
();
}
static
ant_value_t
js_writable__final
(
ant_t
*
js
,
ant_value_t
*
args
,
int
nargs
)
{
ant_value_t
callback
=
nargs
>
0
?
args
[
0
]
:
js_mkundef
();
stream_call_callback
(
js
,
callback
,
NULL
,
0
);
return
js_mkundef
();
}
static
ant_value_t
stream_writable_write_done
(
ant_t
*
js
,
ant_value_t
*
args
,
int
nargs
)
{
ant_value_t
state_obj
=
js_get_slot
(
js_getcurrentfunc
(
js
),
SLOT_DATA
);
ant_value_t
stream_obj
=
js_get
(
js
,
state_obj
,
"stream"
);
ant_value_t
callback
=
js_get
(
js
,
state_obj
,
"callback"
);
stream_private_state_t
*
priv
=
stream_private_state
(
stream_obj
);
ant_value_t
err
=
nargs
>
0
?
args
[
0
]
:
js_mkundef
();
if
(
priv
)
priv
->
writing
=
false
;
if
(
!
is_undefined
(
err
)
&&
!
is_null
(
err
))
{
ant_value_t
destroy_args
[
1
]
=
{
err
};
js_set
(
js
,
state_obj
,
"done"
,
js_true
);
if
(
priv
)
priv
->
pending_final
=
false
;
{
ant_value_t
saved_this
=
js
->
this_val
;
js
->
this_val
=
stream_obj
;
js_stream_destroy
(
js
,
destroy_args
,
1
);
js
->
this_val
=
saved_this
;
}
if
(
is_callable
(
callback
))
stream_call_callback
(
js
,
callback
,
&
err
,
1
);
return
js_mkundef
();
}
if
(
is_callable
(
callback
))
stream_call_callback
(
js
,
callback
,
NULL
,
0
);
stream_emit_named
(
js
,
stream_obj
,
"drain"
);
if
(
priv
&&
priv
->
pending_final
&&
!
priv
->
final_started
)
{
ant_value_t
end_callback
=
js_get_slot
(
stream_obj
,
SLOT_AUX
);
priv
->
pending_final
=
false
;
stream_set_end_callback
(
js
,
stream_obj
,
js_mkundef
());
return
stream_writable_begin_end
(
js
,
stream_obj
,
end_callback
);
}
return
js_mkundef
();
}
static
ant_value_t
stream_writable_write_impl
(
ant_t
*
js
,
ant_value_t
stream_obj
,
ant_value_t
chunk
,
ant_value_t
encoding
,
ant_value_t
callback
,
bool
allow_after_end
)
{
ant_value_t
state
=
0
;
ant_value_t
normalized
=
0
;
ant_value_t
write_fn
=
0
;
ant_value_t
done_state
=
0
;
ant_value_t
done
=
0
;
ant_value_t
write_args
[
3
];
state
=
stream_writable_state
(
js
,
stream_obj
);
if
(
!
is_object_type
(
state
))
return
js_false
;
if
(
(
!
allow_after_end
&&
js_truthy
(
js
,
js_get
(
js
,
stream_obj
,
"writableEnded"
)))
||
js_truthy
(
js
,
js_get
(
js
,
stream_obj
,
"destroyed"
))
)
{
ant_value_t
err
=
js_mkerr
(
js
,
"write after end"
);
if
(
is_callable
(
callback
))
stream_call_callback
(
js
,
callback
,
&
err
,
1
);
else
stream_emit_error
(
js
,
stream_obj
,
err
);
return
js_false
;
}
normalized
=
stream_normalize_chunk
(
js
,
chunk
,
js_truthy
(
js
,
js_get
(
js
,
state
,
"objectMode"
)),
encoding
);
if
(
is_err
(
normalized
))
return
normalized
;
done_state
=
js_mkobj
(
js
);
js_set
(
js
,
done_state
,
"stream"
,
stream_obj
);
js_set
(
js
,
done_state
,
"callback"
,
callback
);
js_set
(
js
,
done_state
,
"done"
,
js_false
);
done
=
stream_make_once
(
js
,
js_heavy_mkfun
(
js
,
stream_writable_write_done
,
done_state
),
js_mkundef
());
write_fn
=
js_getprop_fallback
(
js
,
stream_obj
,
"_write"
);
stream_private_state_t
*
priv
=
stream_private_state
(
stream_obj
);
if
(
priv
)
priv
->
writing
=
true
;
write_args
[
0
]
=
normalized
;
write_args
[
1
]
=
encoding
;
write_args
[
2
]
=
done
;
if
(
is_callable
(
write_fn
))
{
ant_value_t
result
=
stream_call
(
js
,
write_fn
,
stream_obj
,
write_args
,
3
,
false
);
if
(
is_err
(
result
))
{
ant_value_t
err_args
[
1
]
=
{
result
};
stream_call_callback
(
js
,
done
,
err_args
,
1
);
return
js_false
;
}}
return
js_bool
(
!
js_truthy
(
js
,
js_get
(
js
,
stream_obj
,
"destroyed"
)));
}
static
ant_value_t
js_writable_write
(
ant_t
*
js
,
ant_value_t
*
args
,
int
nargs
)
{
ant_value_t
stream_obj
=
stream_require_this
(
js
,
js_getthis
(
js
),
"Writable"
);
ant_value_t
callback
=
js_mkundef
();
ant_value_t
encoding
=
js_mkstr
(
js
,
"utf8"
,
4
);
if
(
is_err
(
stream_obj
))
return
stream_obj
;
if
(
nargs
>
1
&&
is_callable
(
args
[
1
]))
callback
=
args
[
1
];
else
if
(
nargs
>
1
&&
!
is_undefined
(
args
[
1
]))
encoding
=
args
[
1
];
if
(
nargs
>
2
&&
is_callable
(
args
[
2
]))
callback
=
args
[
2
];
return
stream_writable_write_impl
(
js
,
stream_obj
,
nargs
>
0
?
args
[
0
]
:
js_mkundef
(),
encoding
,
callback
,
false
);
}
static
ant_value_t
stream_writable_end_done
(
ant_t
*
js
,
ant_value_t
*
args
,
int
nargs
)
{
ant_value_t
state_obj
=
js_get_slot
(
js_getcurrentfunc
(
js
),
SLOT_DATA
);
ant_value_t
stream_obj
=
js_get
(
js
,
state_obj
,
"stream"
);
ant_value_t
callback
=
js_get
(
js
,
state_obj
,
"callback"
);
ant_value_t
err
=
nargs
>
0
?
args
[
0
]
:
js_mkundef
();
if
(
!
is_undefined
(
err
)
&&
!
is_null
(
err
))
{
ant_value_t
destroy_args
[
1
]
=
{
err
};
ant_value_t
saved_this
=
js
->
this_val
;
js
->
this_val
=
stream_obj
;
js_stream_destroy
(
js
,
destroy_args
,
1
);
js
->
this_val
=
saved_this
;
if
(
is_callable
(
callback
))
stream_call_callback
(
js
,
callback
,
&
err
,
1
);
return
js_mkundef
();
}
js_set
(
js
,
stream_obj
,
"writableFinished"
,
js_true
);
js_set
(
js
,
stream_writable_state
(
js
,
stream_obj
),
"finished"
,
js_true
);
stream_emit_named
(
js
,
stream_obj
,
"finish"
);
if
(
is_callable
(
callback
))
stream_call_callback
(
js
,
callback
,
NULL
,
0
);
if
(
!
js_truthy
(
js
,
js_get
(
js
,
stream_obj
,
"readable"
)))
stream_emit_named
(
js
,
stream_obj
,
"close"
);
return
js_mkundef
();
}
ant_value_t
stream_writable_begin_end
(
ant_t
*
js
,
ant_value_t
stream_obj
,
ant_value_t
callback
)
{
ant_value_t
final_fn
=
0
;
ant_value_t
final_args
[
1
];
ant_value_t
done_state
=
0
;
ant_value_t
done
=
0
;
stream_private_state_t
*
priv
=
stream_private_state
(
stream_obj
);
done_state
=
js_mkobj
(
js
);
js_set
(
js
,
done_state
,
"stream"
,
stream_obj
);
js_set
(
js
,
done_state
,
"callback"
,
callback
);
done
=
stream_make_once
(
js
,
js_heavy_mkfun
(
js
,
stream_writable_end_done
,
done_state
),
js_mkundef
());
if
(
priv
)
{
priv
->
final_started
=
true
;
priv
->
pending_final
=
false
;
}
stream_set_end_callback
(
js
,
stream_obj
,
js_mkundef
());
final_fn
=
js_getprop_fallback
(
js
,
stream_obj
,
"_final"
);
final_args
[
0
]
=
done
;
if
(
is_callable
(
final_fn
))
stream_call
(
js
,
final_fn
,
stream_obj
,
final_args
,
1
,
false
);
else
stream_call_callback
(
js
,
done
,
NULL
,
0
);
return
stream_obj
;
}
static
ant_value_t
stream_writable_end_after_write
(
ant_t
*
js
,
ant_value_t
*
args
,
int
nargs
)
{
ant_value_t
state_obj
=
js_get_slot
(
js_getcurrentfunc
(
js
),
SLOT_DATA
);
ant_value_t
stream_obj
=
js_get
(
js
,
state_obj
,
"stream"
);
ant_value_t
callback
=
js_get
(
js
,
state_obj
,
"callback"
);
stream_private_state_t
*
priv
=
stream_private_state
(
stream_obj
);
ant_value_t
err
=
nargs
>
0
?
args
[
0
]
:
js_mkundef
();
if
(
!
is_undefined
(
err
)
&&
!
is_null
(
err
))
{
ant_value_t
destroy_args
[
1
]
=
{
err
};
ant_value_t
saved_this
=
js
->
this_val
;
js
->
this_val
=
stream_obj
;
js_stream_destroy
(
js
,
destroy_args
,
1
);
js
->
this_val
=
saved_this
;
if
(
is_callable
(
callback
))
stream_call_callback
(
js
,
callback
,
&
err
,
1
);
return
js_mkundef
();
}
if
(
priv
)
priv
->
pending_final
=
false
;
stream_set_end_callback
(
js
,
stream_obj
,
js_mkundef
());
return
stream_writable_begin_end
(
js
,
stream_obj
,
callback
);
}
static
ant_value_t
js_writable_end
(
ant_t
*
js
,
ant_value_t
*
args
,
int
nargs
)
{
ant_value_t
stream_obj
=
stream_require_this
(
js
,
js_getthis
(
js
),
"Writable"
);
ant_value_t
callback
=
js_mkundef
();
ant_value_t
chunk
=
js_mkundef
();
ant_value_t
encoding
=
js_mkundef
();
ant_value_t
after_write_state
=
0
;
ant_value_t
after_write
=
0
;
stream_private_state_t
*
priv
=
NULL
;
if
(
is_err
(
stream_obj
))
return
stream_obj
;
if
(
nargs
>
0
&&
is_callable
(
args
[
0
]))
callback
=
args
[
0
];
else
{
if
(
nargs
>
0
)
chunk
=
args
[
0
];
if
(
nargs
>
1
&&
is_callable
(
args
[
1
]))
callback
=
args
[
1
];
else
if
(
nargs
>
1
)
encoding
=
args
[
1
];
if
(
nargs
>
2
&&
is_callable
(
args
[
2
]))
callback
=
args
[
2
];
}
if
(
js_truthy
(
js
,
js_get
(
js
,
stream_obj
,
"writableEnded"
)))
{
if
(
is_callable
(
callback
))
stream_call_callback
(
js
,
callback
,
NULL
,
0
);
return
stream_obj
;
}
js_set
(
js
,
stream_obj
,
"writableEnded"
,
js_true
);
js_set
(
js
,
stream_writable_state
(
js
,
stream_obj
),
"ended"
,
js_true
);
priv
=
stream_private_state
(
stream_obj
);
if
(
!
is_undefined
(
chunk
)
&&
!
is_null
(
chunk
))
{
after_write_state
=
js_mkobj
(
js
);
js_set
(
js
,
after_write_state
,
"stream"
,
stream_obj
);
js_set
(
js
,
after_write_state
,
"callback"
,
callback
);
after_write
=
stream_make_once
(
js
,
js_heavy_mkfun
(
js
,
stream_writable_end_after_write
,
after_write_state
),
js_mkundef
()
);
stream_writable_write_impl
(
js
,
stream_obj
,
chunk
,
encoding
,
after_write
,
true
);
return
stream_obj
;
}
if
(
priv
&&
priv
->
writing
&&
!
priv
->
final_started
)
{
priv
->
pending_final
=
true
;
stream_set_end_callback
(
js
,
stream_obj
,
callback
);
return
stream_obj
;
}
return
stream_writable_begin_end
(
js
,
stream_obj
,
callback
);
}
static
ant_value_t
js_transform__transform
(
ant_t
*
js
,
ant_value_t
*
args
,
int
nargs
)
{
ant_value_t
callback
=
nargs
>
2
?
args
[
2
]
:
js_mkundef
();
ant_value_t
cb_args
[
2
];
cb_args
[
0
]
=
js_mknull
();
cb_args
[
1
]
=
nargs
>
0
?
args
[
0
]
:
js_mkundef
();
stream_call_callback
(
js
,
callback
,
cb_args
,
2
);
return
js_mkundef
();
}
static
ant_value_t
stream_transform_write_callback
(
ant_t
*
js
,
ant_value_t
*
args
,
int
nargs
)
{
ant_value_t
state_obj
=
js_get_slot
(
js_getcurrentfunc
(
js
),
SLOT_DATA
);
ant_value_t
stream_obj
=
js_get
(
js
,
state_obj
,
"stream"
);
ant_value_t
outer_callback
=
js_get
(
js
,
state_obj
,
"callback"
);
if
(
nargs
>
0
&&
!
is_null
(
args
[
0
])
&&
!
is_undefined
(
args
[
0
]))
{
if
(
is_callable
(
outer_callback
))
stream_call
(
js
,
outer_callback
,
stream_obj
,
&
args
[
0
],
1
,
false
);
return
js_mkundef
();
}
if
(
nargs
>
1
&&
!
is_null
(
args
[
1
])
&&
!
is_undefined
(
args
[
1
]))
stream_readable_push_value
(
js
,
stream_obj
,
args
[
1
],
js_mkundef
());
if
(
is_callable
(
outer_callback
))
stream_call
(
js
,
outer_callback
,
stream_obj
,
NULL
,
0
,
false
);
return
js_mkundef
();
}
static
ant_value_t
js_transform__write
(
ant_t
*
js
,
ant_value_t
*
args
,
int
nargs
)
{
ant_value_t
stream_obj
=
stream_require_this
(
js
,
js_getthis
(
js
),
"Transform"
);
ant_value_t
transform_fn
=
0
;
ant_value_t
cb_state
=
0
;
ant_value_t
cb
=
0
;
ant_value_t
call_args
[
3
];
if
(
is_err
(
stream_obj
))
return
stream_obj
;
transform_fn
=
js_getprop_fallback
(
js
,
stream_obj
,
"_transform"
);
cb_state
=
js_mkobj
(
js
);
js_set
(
js
,
cb_state
,
"stream"
,
stream_obj
);
js_set
(
js
,
cb_state
,
"callback"
,
nargs
>
2
?
args
[
2
]
:
js_mkundef
());
cb
=
js_heavy_mkfun
(
js
,
stream_transform_write_callback
,
cb_state
);
call_args
[
0
]
=
nargs
>
0
?
args
[
0
]
:
js_mkundef
();
call_args
[
1
]
=
nargs
>
1
?
args
[
1
]
:
js_mkstr
(
js
,
"utf8"
,
4
);
call_args
[
2
]
=
cb
;
return
stream_call
(
js
,
transform_fn
,
stream_obj
,
call_args
,
3
,
false
);
}
static
ant_value_t
stream_transform_final_callback
(
ant_t
*
js
,
ant_value_t
*
args
,
int
nargs
)
{
ant_value_t
state_obj
=
js_get_slot
(
js_getcurrentfunc
(
js
),
SLOT_DATA
);
ant_value_t
stream_obj
=
js_get
(
js
,
state_obj
,
"stream"
);
ant_value_t
callback
=
js_get
(
js
,
state_obj
,
"callback"
);
if
(
nargs
>
0
&&
!
is_null
(
args
[
0
])
&&
!
is_undefined
(
args
[
0
]))
{
if
(
is_callable
(
callback
))
stream_call
(
js
,
callback
,
stream_obj
,
&
args
[
0
],
1
,
false
);
return
js_mkundef
();
}
if
(
nargs
>
1
&&
!
is_null
(
args
[
1
])
&&
!
is_undefined
(
args
[
1
]))
stream_readable_push_value
(
js
,
stream_obj
,
args
[
1
],
js_mkundef
());
stream_readable_push_value
(
js
,
stream_obj
,
js_mknull
(),
js_mkundef
());
if
(
is_callable
(
callback
))
stream_call
(
js
,
callback
,
stream_obj
,
NULL
,
0
,
false
);
return
js_mkundef
();
}
static
ant_value_t
js_transform__final
(
ant_t
*
js
,
ant_value_t
*
args
,
int
nargs
)
{
ant_value_t
stream_obj
=
stream_require_this
(
js
,
js_getthis
(
js
),
"Transform"
);
ant_value_t
flush_fn
=
0
;
ant_value_t
cb_state
=
0
;
ant_value_t
cb
=
0
;
ant_value_t
call_args
[
1
];
if
(
is_err
(
stream_obj
))
return
stream_obj
;
flush_fn
=
js_getprop_fallback
(
js
,
stream_obj
,
"_flush"
);
if
(
!
is_callable
(
flush_fn
))
{
stream_readable_push_value
(
js
,
stream_obj
,
js_mknull
(),
js_mkundef
());
stream_call_callback
(
js
,
nargs
>
0
?
args
[
0
]
:
js_mkundef
(),
NULL
,
0
);
return
js_mkundef
();
}
cb_state
=
js_mkobj
(
js
);
js_set
(
js
,
cb_state
,
"stream"
,
stream_obj
);
js_set
(
js
,
cb_state
,
"callback"
,
nargs
>
0
?
args
[
0
]
:
js_mkundef
());
cb
=
js_heavy_mkfun
(
js
,
stream_transform_final_callback
,
cb_state
);
call_args
[
0
]
=
cb
;
return
stream_call
(
js
,
flush_fn
,
stream_obj
,
call_args
,
1
,
false
);
}
static
ant_value_t
js_passthrough__transform
(
ant_t
*
js
,
ant_value_t
*
args
,
int
nargs
)
{
return
js_transform__transform
(
js
,
args
,
nargs
);
}
static
ant_value_t
stream_finished_cleanup
(
ant_t
*
js
,
ant_value_t
state_obj
)
{
ant_value_t
stream_obj
=
js_get
(
js
,
state_obj
,
"stream"
);
if
(
stream_is_instance
(
stream_obj
)
||
is_object_type
(
stream_obj
))
{
stream_remove_listener
(
js
,
stream_obj
,
"end"
,
js_get
(
js
,
state_obj
,
"onFinish"
));
stream_remove_listener
(
js
,
stream_obj
,
"finish"
,
js_get
(
js
,
state_obj
,
"onFinish"
));
stream_remove_listener
(
js
,
stream_obj
,
"close"
,
js_get
(
js
,
state_obj
,
"onFinish"
));
stream_remove_listener
(
js
,
stream_obj
,
"error"
,
js_get
(
js
,
state_obj
,
"onError"
));
}
return
js_mkundef
();
}
static
ant_value_t
stream_finished_fire
(
ant_t
*
js
,
ant_value_t
state_obj
,
ant_value_t
error
)
{
ant_value_t
called
=
js_get
(
js
,
state_obj
,
"called"
);
ant_value_t
callback
=
js_get
(
js
,
state_obj
,
"callback"
);
ant_value_t
cb_args
[
1
];
if
(
js_truthy
(
js
,
called
))
return
js_mkundef
();
js_set
(
js
,
state_obj
,
"called"
,
js_true
);
stream_finished_cleanup
(
js
,
state_obj
);
if
(
is_undefined
(
error
))
stream_call_callback
(
js
,
callback
,
NULL
,
0
);
else
{
cb_args
[
0
]
=
error
;
stream_call_callback
(
js
,
callback
,
cb_args
,
1
);
}
return
js_mkundef
();
}
static
ant_value_t
stream_finished_on_finish
(
ant_t
*
js
,
ant_value_t
*
args
,
int
nargs
)
{
ant_value_t
state_obj
=
js_get_slot
(
js_getcurrentfunc
(
js
),
SLOT_DATA
);
return
stream_finished_fire
(
js
,
state_obj
,
js_mkundef
());
}
static
ant_value_t
stream_finished_on_error
(
ant_t
*
js
,
ant_value_t
*
args
,
int
nargs
)
{
ant_value_t
state_obj
=
js_get_slot
(
js_getcurrentfunc
(
js
),
SLOT_DATA
);
ant_value_t
error
=
nargs
>
0
?
args
[
0
]
:
js_mkundef
();
return
stream_finished_fire
(
js
,
state_obj
,
error
);
}
static
ant_value_t
stream_finished_register
(
ant_t
*
js
,
ant_value_t
stream_obj
,
ant_value_t
callback
)
{
ant_value_t
state_obj
=
js_mkobj
(
js
);
ant_value_t
on_finish
=
0
;
ant_value_t
on_error
=
0
;
if
(
!
is_callable
(
callback
))
callback
=
js_mkfun
(
stream_noop
);
js_set
(
js
,
state_obj
,
"stream"
,
stream_obj
);
js_set
(
js
,
state_obj
,
"callback"
,
callback
);
js_set
(
js
,
state_obj
,
"called"
,
js_false
);
on_finish
=
js_heavy_mkfun
(
js
,
stream_finished_on_finish
,
state_obj
);
on_error
=
js_heavy_mkfun
(
js
,
stream_finished_on_error
,
state_obj
);
js_set
(
js
,
state_obj
,
"onFinish"
,
on_finish
);
js_set
(
js
,
state_obj
,
"onError"
,
on_error
);
eventemitter_add_listener
(
js
,
stream_obj
,
"end"
,
on_finish
,
false
);
eventemitter_add_listener
(
js
,
stream_obj
,
"finish"
,
on_finish
,
false
);
eventemitter_add_listener
(
js
,
stream_obj
,
"close"
,
on_finish
,
false
);
eventemitter_add_listener
(
js
,
stream_obj
,
"error"
,
on_error
,
false
);
return
stream_obj
;
}
static
ant_value_t
js_stream_finished
(
ant_t
*
js
,
ant_value_t
*
args
,
int
nargs
)
{
ant_value_t
callback
=
nargs
>
1
?
args
[
1
]
:
js_mkundef
();
if
(
nargs
<
1
||
!
is_object_type
(
args
[
0
]))
return
js_mkerr
(
js
,
"finished requires a stream"
);
return
stream_finished_register
(
js
,
args
[
0
],
callback
);
}
static
ant_value_t
stream_pipeline_done
(
ant_t
*
js
,
ant_value_t
*
args
,
int
nargs
)
{
ant_value_t
state_obj
=
js_get_slot
(
js_getcurrentfunc
(
js
),
SLOT_DATA
);
ant_value_t
callback
=
js_get
(
js
,
state_obj
,
"callback"
);
ant_value_t
called
=
js_get
(
js
,
state_obj
,
"called"
);
ant_value_t
cb_args
[
1
];
if
(
js_truthy
(
js
,
called
))
return
js_mkundef
();
js_set
(
js
,
state_obj
,
"called"
,
js_true
);
if
(
nargs
>
0
&&
!
is_undefined
(
args
[
0
]))
{
cb_args
[
0
]
=
args
[
0
];
stream_call_callback
(
js
,
callback
,
cb_args
,
1
);
}
else
stream_call_callback
(
js
,
callback
,
NULL
,
0
);
return
js_mkundef
();
}
static
ant_value_t
stream_pipeline_error
(
ant_t
*
js
,
ant_value_t
*
args
,
int
nargs
)
{
ant_value_t
done
=
js_get_slot
(
js_getcurrentfunc
(
js
),
SLOT_DATA
);
if
(
nargs
>
0
&&
!
is_undefined
(
args
[
0
]))
stream_call_callback
(
js
,
done
,
&
args
[
0
],
1
);
return
js_mkundef
();
}
static
ant_value_t
stream_pipeline_schedule_done
(
ant_t
*
js
,
ant_value_t
*
args
,
int
nargs
)
{
ant_value_t
done
=
js_get_slot
(
js_getcurrentfunc
(
js
),
SLOT_DATA
);
stream_call_callback
(
js
,
done
,
NULL
,
0
);
return
js_mkundef
();
}
static
ant_value_t
js_stream_pipeline
(
ant_t
*
js
,
ant_value_t
*
args
,
int
nargs
)
{
int
stream_count
=
nargs
;
ant_value_t
callback
=
js_mkundef
();
ant_value_t
done_state
=
0
;
ant_value_t
done
=
0
;
if
(
nargs
>
0
&&
is_callable
(
args
[
nargs
-
1
]))
{
callback
=
args
[
nargs
-
1
];
stream_count
--
;
}
if
(
!
is_callable
(
callback
))
callback
=
js_mkfun
(
stream_noop
);
if
(
stream_count
<=
0
)
return
js_mkundef
();
done_state
=
js_mkobj
(
js
);
js_set
(
js
,
done_state
,
"callback"
,
callback
);
js_set
(
js
,
done_state
,
"called"
,
js_false
);
done
=
js_heavy_mkfun
(
js
,
stream_pipeline_done
,
done_state
);
if
(
stream_count
<
2
)
{
stream_schedule_microtask
(
js
,
stream_pipeline_schedule_done
,
done
);
return
args
[
0
];
}
for
(
int
i
=
0
;
i
<
stream_count
-
1
;
i
++
)
{
ant_value_t
error_cb
=
js_heavy_mkfun
(
js
,
stream_pipeline_error
,
done
);
ant_value_t
finished_args
[
2
];
finished_args
[
0
]
=
args
[
i
];
finished_args
[
1
]
=
error_cb
;
js_stream_finished
(
js
,
finished_args
,
2
);
ant_value_t
pipe_args
[
2
];
pipe_args
[
0
]
=
args
[
i
+
1
];
pipe_args
[
1
]
=
js_mkundef
();
stream_call_prop
(
js
,
args
[
i
],
"pipe"
,
pipe_args
,
2
);
}
{
ant_value_t
finished_args
[
2
];
finished_args
[
0
]
=
args
[
stream_count
-
1
];
finished_args
[
1
]
=
done
;
js_stream_finished
(
js
,
finished_args
,
2
);
}
return
args
[
stream_count
-
1
];
}
static
ant_value_t
stream_promise_callback
(
ant_t
*
js
,
ant_value_t
*
args
,
int
nargs
)
{
ant_value_t
promise
=
js_get_slot
(
js_getcurrentfunc
(
js
),
SLOT_DATA
);
if
(
nargs
>
0
&&
!
is_undefined
(
args
[
0
]))
js_reject_promise
(
js
,
promise
,
args
[
0
]);
else
js_resolve_promise
(
js
,
promise
,
js_mkundef
());
return
js_mkundef
();
}
static
ant_value_t
js_stream_promises_finished
(
ant_t
*
js
,
ant_value_t
*
args
,
int
nargs
)
{
ant_value_t
promise
=
js_mkpromise
(
js
);
ant_value_t
finished_args
[
2
];
if
(
nargs
<
1
||
!
is_object_type
(
args
[
0
]))
{
js_reject_promise
(
js
,
promise
,
js_mkerr
(
js
,
"finished requires a stream"
));
return
promise
;
}
finished_args
[
0
]
=
args
[
0
];
finished_args
[
1
]
=
js_heavy_mkfun
(
js
,
stream_promise_callback
,
promise
);
js_stream_finished
(
js
,
finished_args
,
2
);
return
promise
;
}
static
ant_value_t
js_stream_promises_pipeline
(
ant_t
*
js
,
ant_value_t
*
args
,
int
nargs
)
{
ant_value_t
promise
=
js_mkpromise
(
js
);
ant_value_t
*
call_args
=
NULL
;
if
(
nargs
<=
0
)
{
js_resolve_promise
(
js
,
promise
,
js_mkundef
());
return
promise
;
}
call_args
=
malloc
((
size_t
)(
nargs
+
1
)
*
sizeof
(
*
call_args
));
if
(
!
call_args
)
{
js_reject_promise
(
js
,
promise
,
js_mkerr
(
js
,
"out of memory"
));
return
promise
;
}
for
(
int
i
=
0
;
i
<
nargs
;
i
++
)
call_args
[
i
]
=
args
[
i
];
call_args
[
nargs
]
=
js_heavy_mkfun
(
js
,
stream_promise_callback
,
promise
);
js_stream_pipeline
(
js
,
call_args
,
nargs
+
1
);
free
(
call_args
);
return
promise
;
}
static
void
stream_release_reader
(
ant_t
*
js
,
ant_value_t
state_obj
)
{
ant_value_t
reader
=
js_get
(
js
,
state_obj
,
"reader"
);
if
(
!
is_object_type
(
reader
))
return
;
stream_call_prop
(
js
,
reader
,
"releaseLock"
,
NULL
,
0
);
}
static
ant_value_t
stream_readable_from_step
(
ant_t
*
js
,
ant_value_t
*
args
,
int
nargs
);
static
void
stream_readable_from_schedule
(
ant_t
*
js
,
ant_value_t
state_obj
)
{
stream_schedule_microtask
(
js
,
stream_readable_from_step
,
state_obj
);
}
static
ant_value_t
stream_readable_from_fail
(
ant_t
*
js
,
ant_value_t
state_obj
,
ant_value_t
error
)
{
ant_value_t
readable
=
js_get
(
js
,
state_obj
,
"readable"
);
stream_release_reader
(
js
,
state_obj
);
if
(
stream_is_instance
(
readable
))
{
ant_value_t
destroy_args
[
1
]
=
{
error
};
ant_value_t
saved_this
=
js
->
this_val
;
js
->
this_val
=
readable
;
js_stream_destroy
(
js
,
destroy_args
,
1
);
js
->
this_val
=
saved_this
;
}
return
js_mkundef
();
}
static
ant_value_t
stream_readable_from_handle_result
(
ant_t
*
js
,
ant_value_t
*
args
,
int
nargs
)
{
ant_value_t
state_obj
=
js_get_slot
(
js_getcurrentfunc
(
js
),
SLOT_DATA
);
ant_value_t
readable
=
js_get
(
js
,
state_obj
,
"readable"
);
ant_value_t
result
=
nargs
>
0
?
args
[
0
]
:
js_mkundef
();
ant_value_t
done
=
0
;
ant_value_t
value
=
0
;
if
(
js_truthy
(
js
,
js_get
(
js
,
readable
,
"destroyed"
)))
{
stream_release_reader
(
js
,
state_obj
);
return
js_mkundef
();
}
if
(
!
is_object_type
(
result
))
return
stream_readable_from_fail
(
js
,
state_obj
,
js_mkerr
(
js
,
"iterator step must be an object"
));
done
=
js_get
(
js
,
result
,
"done"
);
value
=
js_get
(
js
,
result
,
"value"
);
if
(
js_truthy
(
js
,
done
))
{
stream_release_reader
(
js
,
state_obj
);
stream_readable_push_value
(
js
,
readable
,
js_mknull
(),
js_mkundef
());
return
js_mkundef
();
}
stream_readable_push_value
(
js
,
readable
,
value
,
js_mkundef
());
if
(
!
js_truthy
(
js
,
js_get
(
js
,
readable
,
"destroyed"
)))
stream_readable_from_schedule
(
js
,
state_obj
);
return
js_mkundef
();
}
static
ant_value_t
stream_readable_from_reject
(
ant_t
*
js
,
ant_value_t
*
args
,
int
nargs
)
{
ant_value_t
state_obj
=
js_get_slot
(
js_getcurrentfunc
(
js
),
SLOT_DATA
);
ant_value_t
error
=
nargs
>
0
?
args
[
0
]
:
js_mkerr
(
js
,
"stream iteration failed"
);
return
stream_readable_from_fail
(
js
,
state_obj
,
error
);
}
static
ant_value_t
stream_readable_from_step
(
ant_t
*
js
,
ant_value_t
*
args
,
int
nargs
)
{
ant_value_t
state_obj
=
js_get_slot
(
js_getcurrentfunc
(
js
),
SLOT_DATA
);
ant_value_t
readable
=
js_get
(
js
,
state_obj
,
"readable"
);
ant_value_t
mode
=
js_get
(
js
,
state_obj
,
"mode"
);
ant_value_t
iterator
=
js_get
(
js
,
state_obj
,
"iterator"
);
ant_value_t
next_result
=
js_mkundef
();
ant_value_t
on_resolve
=
js_heavy_mkfun
(
js
,
stream_readable_from_handle_result
,
state_obj
);
ant_value_t
on_reject
=
js_heavy_mkfun
(
js
,
stream_readable_from_reject
,
state_obj
);
ant_value_t
then_result
=
0
;
if
(
js_truthy
(
js
,
js_get
(
js
,
readable
,
"destroyed"
)))
{
stream_release_reader
(
js
,
state_obj
);
return
js_mkundef
();
}
if
(
stream_key_is_cstr
(
js
,
mode
,
"reader"
))
next_result
=
stream_call_prop
(
js
,
iterator
,
"read"
,
NULL
,
0
);
else
next_result
=
stream_call_prop
(
js
,
iterator
,
"next"
,
NULL
,
0
);
if
(
is_err
(
next_result
))
return
stream_readable_from_fail
(
js
,
state_obj
,
next_result
);
if
(
vtype
(
next_result
)
==
T_PROMISE
)
{
then_result
=
js_promise_then
(
js
,
next_result
,
on_resolve
,
on_reject
);
promise_mark_handled
(
then_result
);
return
js_mkundef
();
}
ant_value_t
one_arg
[
1
]
=
{
next_result
};
return
stream_readable_from_handle_result
(
js
,
one_arg
,
1
);
}
static
ant_value_t
stream_readable_from_start
(
ant_t
*
js
,
ant_value_t
*
args
,
int
nargs
)
{
ant_value_t
state_obj
=
js_get_slot
(
js_getcurrentfunc
(
js
),
SLOT_DATA
);
ant_value_t
readable
=
js_get
(
js
,
state_obj
,
"readable"
);
ant_value_t
source
=
js_get
(
js
,
state_obj
,
"source"
);
ant_value_t
async_iter_fn
=
0
;
ant_value_t
reader_fn
=
0
;
js_iter_t
it
;
if
(
js_truthy
(
js
,
js_get
(
js
,
readable
,
"destroyed"
)))
return
js_mkundef
();
async_iter_fn
=
is_object_type
(
source
)
?
js_get_sym
(
js
,
source
,
get_asyncIterator_sym
())
:
js_mkundef
();
if
(
is_callable
(
async_iter_fn
))
{
ant_value_t
iterator
=
stream_call
(
js
,
async_iter_fn
,
source
,
NULL
,
0
,
false
);
if
(
is_err
(
iterator
))
return
stream_readable_from_fail
(
js
,
state_obj
,
iterator
);
js_set
(
js
,
state_obj
,
"iterator"
,
iterator
);
js_set
(
js
,
state_obj
,
"mode"
,
js_mkstr
(
js
,
"async"
,
5
));
stream_readable_from_schedule
(
js
,
state_obj
);
return
js_mkundef
();
}
if
(
js_iter_open
(
js
,
source
,
&
it
))
{
ant_value_t
value
=
0
;
while
(
js_iter_next
(
js
,
&
it
,
&
value
))
{
if
(
js_truthy
(
js
,
js_get
(
js
,
readable
,
"destroyed"
)))
break
;
stream_readable_push_value
(
js
,
readable
,
value
,
js_mkundef
());
}
js_iter_close
(
js
,
&
it
);
if
(
!
js_truthy
(
js
,
js_get
(
js
,
readable
,
"destroyed"
)))
stream_readable_push_value
(
js
,
readable
,
js_mknull
(),
js_mkundef
());
return
js_mkundef
();
}
reader_fn
=
is_object_type
(
source
)
?
js_get
(
js
,
source
,
"getReader"
)
:
js_mkundef
();
if
(
is_callable
(
reader_fn
))
{
ant_value_t
reader
=
stream_call
(
js
,
reader_fn
,
source
,
NULL
,
0
,
false
);
if
(
is_err
(
reader
))
return
stream_readable_from_fail
(
js
,
state_obj
,
reader
);
js_set
(
js
,
state_obj
,
"reader"
,
reader
);
js_set
(
js
,
state_obj
,
"iterator"
,
reader
);
js_set
(
js
,
state_obj
,
"mode"
,
js_mkstr
(
js
,
"reader"
,
6
));
stream_readable_from_schedule
(
js
,
state_obj
);
return
js_mkundef
();
}
if
(
!
is_undefined
(
source
))
stream_readable_push_value
(
js
,
readable
,
source
,
js_mkundef
());
stream_readable_push_value
(
js
,
readable
,
js_mknull
(),
js_mkundef
());
return
js_mkundef
();
}
static
ant_value_t
js_readable_from
(
ant_t
*
js
,
ant_value_t
*
args
,
int
nargs
)
{
ant_value_t
ctor_args
[
1
];
ant_value_t
readable
=
0
;
ant_value_t
state_obj
=
0
;
ctor_args
[
0
]
=
nargs
>
1
?
args
[
1
]
:
js_mkundef
();
readable
=
stream_construct
(
js
,
g_readable_proto
,
ctor_args
[
0
],
stream_init_readable
);
if
(
is_err
(
readable
))
return
readable
;
state_obj
=
js_mkobj
(
js
);
js_set
(
js
,
state_obj
,
"readable"
,
readable
);
js_set
(
js
,
state_obj
,
"source"
,
nargs
>
0
?
args
[
0
]
:
js_mkundef
());
js_set
(
js
,
state_obj
,
"iterator"
,
js_mkundef
());
js_set
(
js
,
state_obj
,
"reader"
,
js_mkundef
());
js_set
(
js
,
state_obj
,
"mode"
,
js_mkundef
());
stream_schedule_microtask
(
js
,
stream_readable_from_start
,
state_obj
);
return
readable
;
}
static
ant_value_t
js_readable_from_web
(
ant_t
*
js
,
ant_value_t
*
args
,
int
nargs
)
{
return
js_readable_from
(
js
,
args
,
nargs
);
}
static
ant_value_t
js_stream_ctor
(
ant_t
*
js
,
ant_value_t
*
args
,
int
nargs
)
{
return
stream_construct
(
js
,
g_stream_proto
,
nargs
>
0
?
args
[
0
]
:
js_mkundef
(),
stream_init_base
);
}
static
ant_value_t
js_readable_ctor
(
ant_t
*
js
,
ant_value_t
*
args
,
int
nargs
)
{
return
stream_construct
(
js
,
g_readable_proto
,
nargs
>
0
?
args
[
0
]
:
js_mkundef
(),
stream_init_readable
);
}
static
ant_value_t
js_writable_ctor
(
ant_t
*
js
,
ant_value_t
*
args
,
int
nargs
)
{
return
stream_construct
(
js
,
g_writable_proto
,
nargs
>
0
?
args
[
0
]
:
js_mkundef
(),
stream_init_writable
);
}
static
ant_value_t
js_duplex_ctor
(
ant_t
*
js
,
ant_value_t
*
args
,
int
nargs
)
{
ant_value_t
proto
=
js_instance_proto_from_new_target
(
js
,
g_duplex_proto
);
ant_value_t
obj
=
stream_make_base_object
(
js
,
is_object_type
(
proto
)
?
proto
:
g_duplex_proto
);
ant_value_t
options
=
nargs
>
0
?
args
[
0
]
:
js_mkundef
();
ant_value_t
options_obj
=
is_object_type
(
options
)
?
options
:
js_mkobj
(
js
);
stream_init_readable
(
js
,
obj
,
options
);
stream_init_writable
(
js
,
obj
,
options
);
js_set
(
js
,
obj
,
"readable"
,
js_true
);
js_set
(
js
,
obj
,
"writable"
,
js_true
);
js_set
(
js
,
obj
,
"allowHalfOpen"
,
js_bool
(
stream_get_option
(
js
,
options_obj
,
"allowHalfOpen"
)
!=
js_false
));
return
obj
;
}
static
ant_value_t
js_transform_ctor
(
ant_t
*
js
,
ant_value_t
*
args
,
int
nargs
)
{
ant_value_t
proto
=
js_instance_proto_from_new_target
(
js
,
g_transform_proto
);
ant_value_t
obj
=
stream_make_base_object
(
js
,
is_object_type
(
proto
)
?
proto
:
g_transform_proto
);
ant_value_t
options
=
nargs
>
0
?
args
[
0
]
:
js_mkundef
();
ant_value_t
options_obj
=
is_object_type
(
options
)
?
options
:
js_mkobj
(
js
);
ant_value_t
transform_fn
=
0
;
ant_value_t
flush_fn
=
0
;
stream_init_readable
(
js
,
obj
,
options
);
stream_init_writable
(
js
,
obj
,
options
);
js_set
(
js
,
obj
,
"readable"
,
js_true
);
js_set
(
js
,
obj
,
"writable"
,
js_true
);
js_set
(
js
,
obj
,
"allowHalfOpen"
,
js_bool
(
stream_get_option
(
js
,
options_obj
,
"allowHalfOpen"
)
!=
js_false
));
transform_fn
=
stream_get_option
(
js
,
options_obj
,
"transform"
);
flush_fn
=
stream_get_option
(
js
,
options_obj
,
"flush"
);
if
(
is_callable
(
transform_fn
))
js_set
(
js
,
obj
,
"_transform"
,
transform_fn
);
if
(
is_callable
(
flush_fn
))
js_set
(
js
,
obj
,
"_flush"
,
flush_fn
);
return
obj
;
}
static
ant_value_t
js_passthrough_ctor
(
ant_t
*
js
,
ant_value_t
*
args
,
int
nargs
)
{
ant_value_t
proto
=
js_instance_proto_from_new_target
(
js
,
g_passthrough_proto
);
ant_value_t
obj
=
stream_make_base_object
(
js
,
is_object_type
(
proto
)
?
proto
:
g_passthrough_proto
);
ant_value_t
options
=
nargs
>
0
?
args
[
0
]
:
js_mkundef
();
ant_value_t
options_obj
=
is_object_type
(
options
)
?
options
:
js_mkobj
(
js
);
stream_init_readable
(
js
,
obj
,
options
);
stream_init_writable
(
js
,
obj
,
options
);
js_set
(
js
,
obj
,
"readable"
,
js_true
);
js_set
(
js
,
obj
,
"writable"
,
js_true
);
js_set
(
js
,
obj
,
"allowHalfOpen"
,
js_bool
(
stream_get_option
(
js
,
options_obj
,
"allowHalfOpen"
)
!=
js_false
));
return
obj
;
}
void
stream_init_constructors
(
ant_t
*
js
)
{
ant_value_t
events
=
0
;
ant_value_t
ee_ctor
=
0
;
ant_value_t
ee_proto
=
0
;
if
(
g_stream_ctor
)
return
;
events
=
events_library
(
js
);
ee_ctor
=
js_get
(
js
,
events
,
"EventEmitter"
);
ee_proto
=
js_get
(
js
,
ee_ctor
,
"prototype"
);
g_stream_proto
=
js_mkobj
(
js
);
js_set_proto_init
(
g_stream_proto
,
ee_proto
);
js_set
(
js
,
g_stream_proto
,
"pipe"
,
js_mkfun
(
js_stream_pipe
));
js_set
(
js
,
g_stream_proto
,
"unpipe"
,
js_mkfun
(
js_stream_unpipe
));
js_set
(
js
,
g_stream_proto
,
"pause"
,
js_mkfun
(
js_stream_pause
));
js_set
(
js
,
g_stream_proto
,
"resume"
,
js_mkfun
(
js_stream_resume
));
js_set
(
js
,
g_stream_proto
,
"isPaused"
,
js_mkfun
(
js_stream_is_paused
));
js_set
(
js
,
g_stream_proto
,
"destroy"
,
js_mkfun
(
js_stream_destroy
));
js_set_sym
(
js
,
g_stream_proto
,
get_toStringTag_sym
(),
js_mkstr
(
js
,
"Stream"
,
6
));
g_stream_ctor
=
js_make_ctor
(
js
,
js_stream_ctor
,
g_stream_proto
,
"Stream"
,
6
);
g_readable_proto
=
js_mkobj
(
js
);
js_set_proto_init
(
g_readable_proto
,
g_stream_proto
);
js_set
(
js
,
g_readable_proto
,
"_read"
,
js_mkfun
(
js_readable__read
));
js_set
(
js
,
g_readable_proto
,
"push"
,
js_mkfun
(
js_readable_push
));
js_set
(
js
,
g_readable_proto
,
"read"
,
js_mkfun
(
js_readable_read
));
js_set
(
js
,
g_readable_proto
,
"setEncoding"
,
js_mkfun
(
js_readable_set_encoding
));
js_set
(
js
,
g_readable_proto
,
"on"
,
js_mkfun
(
js_readable_on
));
js_set
(
js
,
g_readable_proto
,
"resume"
,
js_mkfun
(
js_readable_resume
));
js_set
(
js
,
g_readable_proto
,
"pause"
,
js_mkfun
(
js_readable_pause
));
js_set_sym
(
js
,
g_readable_proto
,
get_toStringTag_sym
(),
js_mkstr
(
js
,
"Readable"
,
8
));
g_readable_ctor
=
js_make_ctor
(
js
,
js_readable_ctor
,
g_readable_proto
,
"Readable"
,
8
);
js_set
(
js
,
g_readable_ctor
,
"from"
,
js_mkfun
(
js_readable_from
));
js_set
(
js
,
g_readable_ctor
,
"fromWeb"
,
js_mkfun
(
js_readable_from_web
));
g_writable_proto
=
js_mkobj
(
js
);
js_set_proto_init
(
g_writable_proto
,
g_stream_proto
);
js_set
(
js
,
g_writable_proto
,
"_write"
,
js_mkfun
(
js_writable__write
));
js_set
(
js
,
g_writable_proto
,
"_final"
,
js_mkfun
(
js_writable__final
));
js_set
(
js
,
g_writable_proto
,
"write"
,
js_mkfun
(
js_writable_write
));
js_set
(
js
,
g_writable_proto
,
"end"
,
js_mkfun
(
js_writable_end
));
js_set
(
js
,
g_writable_proto
,
"cork"
,
js_mkfun
(
stream_noop
));
js_set
(
js
,
g_writable_proto
,
"uncork"
,
js_mkfun
(
stream_noop
));
js_set_sym
(
js
,
g_writable_proto
,
get_toStringTag_sym
(),
js_mkstr
(
js
,
"Writable"
,
8
));
g_writable_ctor
=
js_make_ctor
(
js
,
js_writable_ctor
,
g_writable_proto
,
"Writable"
,
8
);
g_duplex_proto
=
js_mkobj
(
js
);
js_set_proto_init
(
g_duplex_proto
,
g_readable_proto
);
js_set
(
js
,
g_duplex_proto
,
"_write"
,
js_mkfun
(
js_writable__write
));
js_set
(
js
,
g_duplex_proto
,
"_final"
,
js_mkfun
(
js_writable__final
));
js_set
(
js
,
g_duplex_proto
,
"write"
,
js_mkfun
(
js_writable_write
));
js_set
(
js
,
g_duplex_proto
,
"end"
,
js_mkfun
(
js_writable_end
));
js_set
(
js
,
g_duplex_proto
,
"cork"
,
js_mkfun
(
stream_noop
));
js_set
(
js
,
g_duplex_proto
,
"uncork"
,
js_mkfun
(
stream_noop
));
js_set_sym
(
js
,
g_duplex_proto
,
get_toStringTag_sym
(),
js_mkstr
(
js
,
"Duplex"
,
6
));
g_duplex_ctor
=
js_make_ctor
(
js
,
js_duplex_ctor
,
g_duplex_proto
,
"Duplex"
,
6
);
g_transform_proto
=
js_mkobj
(
js
);
js_set_proto_init
(
g_transform_proto
,
g_duplex_proto
);
js_set
(
js
,
g_transform_proto
,
"_transform"
,
js_mkfun
(
js_transform__transform
));
js_set
(
js
,
g_transform_proto
,
"_write"
,
js_mkfun
(
js_transform__write
));
js_set
(
js
,
g_transform_proto
,
"_final"
,
js_mkfun
(
js_transform__final
));
js_set_sym
(
js
,
g_transform_proto
,
get_toStringTag_sym
(),
js_mkstr
(
js
,
"Transform"
,
9
));
g_transform_ctor
=
js_make_ctor
(
js
,
js_transform_ctor
,
g_transform_proto
,
"Transform"
,
9
);
g_passthrough_proto
=
js_mkobj
(
js
);
js_set_proto_init
(
g_passthrough_proto
,
g_transform_proto
);
js_set
(
js
,
g_passthrough_proto
,
"_transform"
,
js_mkfun
(
js_passthrough__transform
));
js_set_sym
(
js
,
g_passthrough_proto
,
get_toStringTag_sym
(),
js_mkstr
(
js
,
"PassThrough"
,
11
));
g_passthrough_ctor
=
js_make_ctor
(
js
,
js_passthrough_ctor
,
g_passthrough_proto
,
"PassThrough"
,
11
);
gc_register_root
(
&
g_stream_proto
);
gc_register_root
(
&
g_stream_ctor
);
gc_register_root
(
&
g_readable_proto
);
gc_register_root
(
&
g_readable_ctor
);
gc_register_root
(
&
g_writable_proto
);
gc_register_root
(
&
g_writable_ctor
);
gc_register_root
(
&
g_duplex_proto
);
gc_register_root
(
&
g_duplex_ctor
);
gc_register_root
(
&
g_transform_proto
);
gc_register_root
(
&
g_transform_ctor
);
gc_register_root
(
&
g_passthrough_proto
);
gc_register_root
(
&
g_passthrough_ctor
);
}
ant_value_t
stream_readable_constructor
(
ant_t
*
js
)
{
stream_init_constructors
(
js
);
return
g_readable_ctor
;
}
ant_value_t
stream_writable_constructor
(
ant_t
*
js
)
{
stream_init_constructors
(
js
);
return
g_writable_ctor
;
}
ant_value_t
stream_readable_prototype
(
ant_t
*
js
)
{
stream_init_constructors
(
js
);
return
g_readable_proto
;
}
ant_value_t
stream_writable_prototype
(
ant_t
*
js
)
{
stream_init_constructors
(
js
);
return
g_writable_proto
;
}
ant_value_t
stream_duplex_prototype
(
ant_t
*
js
)
{
stream_init_constructors
(
js
);
return
g_duplex_proto
;
}
ant_value_t
stream_construct_readable
(
ant_t
*
js
,
ant_value_t
base_proto
,
ant_value_t
options
)
{
stream_init_constructors
(
js
);
return
stream_construct
(
js
,
base_proto
,
options
,
stream_init_readable
);
}
ant_value_t
stream_construct_writable
(
ant_t
*
js
,
ant_value_t
base_proto
,
ant_value_t
options
)
{
stream_init_constructors
(
js
);
return
stream_construct
(
js
,
base_proto
,
options
,
stream_init_writable
);
}
void
stream_init_readable_object
(
ant_t
*
js
,
ant_value_t
obj
,
ant_value_t
options
)
{
stream_init_constructors
(
js
);
if
(
!
is_object_type
(
obj
))
return
;
js_set_native_tag
(
obj
,
STREAM_NATIVE_TAG
);
stream_init_readable
(
js
,
obj
,
options
);
}
void
stream_init_writable_object
(
ant_t
*
js
,
ant_value_t
obj
,
ant_value_t
options
)
{
stream_init_constructors
(
js
);
if
(
!
is_object_type
(
obj
))
return
;
js_set_native_tag
(
obj
,
STREAM_NATIVE_TAG
);
stream_init_writable
(
js
,
obj
,
options
);
}
void
stream_init_duplex_object
(
ant_t
*
js
,
ant_value_t
obj
,
ant_value_t
options
)
{
stream_init_constructors
(
js
);
if
(
!
is_object_type
(
obj
))
return
;
js_set_native_tag
(
obj
,
STREAM_NATIVE_TAG
);
stream_init_readable
(
js
,
obj
,
options
);
stream_init_writable
(
js
,
obj
,
options
);
}
ant_value_t
stream_readable_push
(
ant_t
*
js
,
ant_value_t
stream_obj
,
ant_value_t
chunk
,
ant_value_t
encoding
)
{
stream_init_constructors
(
js
);
return
stream_readable_push_value
(
js
,
stream_obj
,
chunk
,
encoding
);
}
static
ant_value_t
js_stream_get_default_high_water_mark
(
ant_t
*
js
,
ant_value_t
*
args
,
int
nargs
)
{
bool
object_mode
=
nargs
>
0
&&
js_truthy
(
js
,
args
[
0
]);
return
js_mknum
(
stream_default_high_water_mark
(
object_mode
));
}
static
ant_value_t
js_stream_set_default_high_water_mark
(
ant_t
*
js
,
ant_value_t
*
args
,
int
nargs
)
{
if
(
nargs
<
2
||
vtype
(
args
[
1
])
!=
T_NUM
||
js_getnum
(
args
[
1
])
<
0
)
return
js_mkerr_typed
(
js
,
JS_ERR_RANGE
,
"setDefaultHighWaterMark requires a non-negative number"
);
bool
object_mode
=
js_truthy
(
js
,
args
[
0
]);
if
(
object_mode
)
g_default_object_high_water_mark
=
js_getnum
(
args
[
1
]);
else
g_default_high_water_mark
=
js_getnum
(
args
[
1
]);
return
js_mkundef
();
}
static
void
stream_web_copy_global
(
ant_t
*
js
,
ant_value_t
obj
,
const
char
*
name
)
{
ant_value_t
value
=
js_get
(
js
,
js
->
global
,
name
);
if
(
is_err
(
value
))
return
;
js_set
(
js
,
obj
,
name
,
value
);
}
// TODO: remove copy-on-start
static
void
stream_web_define_common
(
ant_t
*
js
,
ant_value_t
obj
)
{
stream_web_copy_global
(
js
,
obj
,
"ReadableStream"
);
stream_web_copy_global
(
js
,
obj
,
"ReadableStreamDefaultReader"
);
stream_web_copy_global
(
js
,
obj
,
"ReadableStreamDefaultController"
);
stream_web_copy_global
(
js
,
obj
,
"WritableStream"
);
stream_web_copy_global
(
js
,
obj
,
"WritableStreamDefaultWriter"
);
stream_web_copy_global
(
js
,
obj
,
"WritableStreamDefaultController"
);
stream_web_copy_global
(
js
,
obj
,
"TransformStream"
);
stream_web_copy_global
(
js
,
obj
,
"TransformStreamDefaultController"
);
stream_web_copy_global
(
js
,
obj
,
"ByteLengthQueuingStrategy"
);
stream_web_copy_global
(
js
,
obj
,
"CountQueuingStrategy"
);
stream_web_copy_global
(
js
,
obj
,
"TextEncoderStream"
);
stream_web_copy_global
(
js
,
obj
,
"TextDecoderStream"
);
stream_web_copy_global
(
js
,
obj
,
"CompressionStream"
);
stream_web_copy_global
(
js
,
obj
,
"DecompressionStream"
);
}
ant_value_t
stream_library
(
ant_t
*
js
)
{
ant_value_t
lib
=
js_mkobj
(
js
);
ant_value_t
promises
=
js_mkobj
(
js
);
stream_init_constructors
(
js
);
js_set
(
js
,
promises
,
"pipeline"
,
js_mkfun
(
js_stream_promises_pipeline
));
js_set
(
js
,
promises
,
"finished"
,
js_mkfun
(
js_stream_promises_finished
));
js_set_module_default
(
js
,
lib
,
g_stream_ctor
,
"Stream"
);
js_set
(
js
,
lib
,
"Readable"
,
g_readable_ctor
);
js_set
(
js
,
lib
,
"Writable"
,
g_writable_ctor
);
js_set
(
js
,
lib
,
"Duplex"
,
g_duplex_ctor
);
js_set
(
js
,
lib
,
"Transform"
,
g_transform_ctor
);
js_set
(
js
,
lib
,
"PassThrough"
,
g_passthrough_ctor
);
js_set
(
js
,
lib
,
"pipeline"
,
js_mkfun
(
js_stream_pipeline
));
js_set
(
js
,
lib
,
"finished"
,
js_mkfun
(
js_stream_finished
));
js_set
(
js
,
lib
,
"getDefaultHighWaterMark"
,
js_mkfun
(
js_stream_get_default_high_water_mark
));
js_set
(
js
,
lib
,
"setDefaultHighWaterMark"
,
js_mkfun
(
js_stream_set_default_high_water_mark
));
js_set
(
js
,
lib
,
"promises"
,
promises
);
js_set
(
js
,
g_stream_ctor
,
"Readable"
,
g_readable_ctor
);
js_set
(
js
,
g_stream_ctor
,
"Writable"
,
g_writable_ctor
);
js_set
(
js
,
g_stream_ctor
,
"Duplex"
,
g_duplex_ctor
);
js_set
(
js
,
g_stream_ctor
,
"Transform"
,
g_transform_ctor
);
js_set
(
js
,
g_stream_ctor
,
"PassThrough"
,
g_passthrough_ctor
);
js_set
(
js
,
g_stream_ctor
,
"pipeline"
,
js_get
(
js
,
lib
,
"pipeline"
));
js_set
(
js
,
g_stream_ctor
,
"finished"
,
js_get
(
js
,
lib
,
"finished"
));
js_set
(
js
,
g_stream_ctor
,
"getDefaultHighWaterMark"
,
js_get
(
js
,
lib
,
"getDefaultHighWaterMark"
));
js_set
(
js
,
g_stream_ctor
,
"setDefaultHighWaterMark"
,
js_get
(
js
,
lib
,
"setDefaultHighWaterMark"
));
js_set
(
js
,
g_stream_ctor
,
"promises"
,
promises
);
js_set
(
js
,
promises
,
"default"
,
promises
);
js_set_slot_wb
(
js
,
promises
,
SLOT_DEFAULT
,
promises
);
js_set_sym
(
js
,
lib
,
get_toStringTag_sym
(),
js_mkstr
(
js
,
"stream"
,
6
));
return
lib
;
}
ant_value_t
stream_promises_library
(
ant_t
*
js
)
{
ant_value_t
stream_ns
=
js_esm_import_sync_cstr
(
js
,
"stream"
,
6
);
ant_value_t
promises
=
0
;
if
(
is_err
(
stream_ns
))
return
stream_ns
;
promises
=
js_get
(
js
,
stream_ns
,
"promises"
);
if
(
!
is_object_type
(
promises
))
return
js_mkerr
(
js
,
"stream.promises is not available"
);
js_set
(
js
,
promises
,
"default"
,
promises
);
js_set_slot_wb
(
js
,
promises
,
SLOT_DEFAULT
,
promises
);
return
promises
;
}
ant_value_t
stream_web_library
(
ant_t
*
js
)
{
ant_value_t
lib
=
js_mkobj
(
js
);
stream_web_define_common
(
js
,
lib
);
js_set
(
js
,
lib
,
"default"
,
lib
);
js_set_slot_wb
(
js
,
lib
,
SLOT_DEFAULT
,
lib
);
js_set_sym
(
js
,
lib
,
get_toStringTag_sym
(),
js_mkstr
(
js
,
"stream/web"
,
10
));
return
lib
;
}
File Metadata
Details
Attached
Mime Type
text/x-c
Expires
Fri, May 1, 2:20 PM (2 d)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
541659
Default Alt Text
stream.c (72 KB)
Attached To
Mode
rANT Ant
Attached
Detach File
Event Timeline
Log In to Comment