Skip to content

Commit

Permalink
Add optional frame to is_ready.
Browse files Browse the repository at this point in the history
  • Loading branch information
toots committed Sep 24, 2023
1 parent 88380cf commit 373373c
Show file tree
Hide file tree
Showing 93 changed files with 228 additions and 171 deletions.
4 changes: 3 additions & 1 deletion src/core/builtins/builtins_ffmpeg_filters.ml
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,9 @@ let initialized graph =

let is_ready graph =
(not graph.failed)
&& Queue.fold (fun cur s -> cur && s#is_ready) true graph.graph_inputs
&& Queue.fold
(fun cur (s : Source.source) -> cur && s#is_ready ())
true graph.graph_inputs

let pull graph = (Clock.get (Queue.peek graph.graph_inputs)#clock)#end_tick

Expand Down
6 changes: 3 additions & 3 deletions src/core/builtins/builtins_source.ml
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ let _ =
available), or currently streaming."
[("", Lang.source_t (Lang.univ_t ()), None, None)]
Lang.bool_t
(fun p -> Lang.bool (Lang.to_source (List.assoc "" p))#is_ready)
(fun p -> Lang.bool ((Lang.to_source (List.assoc "" p))#is_ready ()))

let _ =
Lang.add_builtin ~base:source "is_up" ~category:`System
Expand Down Expand Up @@ -211,7 +211,7 @@ let _ =
Clock.unify ~pos:fo#pos fo#clock (Clock.create_known clock);
ignore (clock#start_outputs (fun _ -> true) ());
log#info "Start dumping source (ratio: %.02fx)" ratio;
while (not (Atomic.get should_stop)) && fo#is_ready do
while (not (Atomic.get should_stop)) && fo#is_ready () do
let start_time = Time.time () in
clock#end_tick;
sleep_until (start_time |+| latency)
Expand Down Expand Up @@ -252,7 +252,7 @@ let _ =
Clock.unify ~pos:o#pos o#clock (Clock.create_known clock);
ignore (clock#start_outputs (fun _ -> true) ());
log#info "Start dropping source (ratio: %.02fx)" ratio;
while (not (Atomic.get should_stop)) && o#is_ready do
while (not (Atomic.get should_stop)) && o#is_ready () do
let start_time = Time.time () in
clock#end_tick;
sleep_until (start_time |+| latency)
Expand Down
4 changes: 2 additions & 2 deletions src/core/clock.ml
Original file line number Diff line number Diff line change
Expand Up @@ -208,8 +208,8 @@ module MkClock (Time : Liq_time.T) = struct
match sync with
| `Auto ->
List.exists
(fun (state, s) ->
state = `Active && snd s#self_sync && s#is_ready)
(fun (state, (s : Source.active_source)) ->
state = `Active && snd s#self_sync && s#is_ready ())
outputs
| `CPU -> false
| `None -> true
Expand Down
2 changes: 1 addition & 1 deletion src/core/conversions/conversion.ml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ class base ?(audio = false) ?(video = false) ?(midi = false) ~converter
(source : Source.source) =
object (self)
method stype = source#stype
method is_ready = source#is_ready
method private _is_ready = source#is_ready
method abort_track = source#abort_track
method remaining = source#remaining
method seek = source#seek
Expand Down
2 changes: 1 addition & 1 deletion src/core/conversions/swap.ml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ class swap ~field (source : source) =
object
inherit operator [source] ~name:"swap"
method stype = source#stype
method is_ready = source#is_ready
method private _is_ready = source#is_ready
method remaining = source#remaining
method abort_track = source#abort_track
method seek = source#seek
Expand Down
4 changes: 2 additions & 2 deletions src/core/io/ffmpeg_filter_io.ml
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ class virtual ['a] base_output ~pass_metadata ~name ~frame_t ~field source =
method stop = ()
method! reset = ()
val mutable is_up = false
method! is_ready = is_up && super#is_ready
method! is_ready ?frame () = is_up && super#is_ready ?frame ()

method! wake_up l =
is_up <- true;
Expand Down Expand Up @@ -247,7 +247,7 @@ class virtual ['a] input_base ~name ~pass_metadata ~self_sync_type ~self_sync
f ()
with Not_ready -> ()

method is_ready =
method private _is_ready ?frame:_ _ =
Generator.length self#buffer >= Lazy.force Frame.size || is_ready ()

method private get_frame frame =
Expand Down
2 changes: 1 addition & 1 deletion src/core/io/ffmpeg_io.ml
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ class input ?(name = "input.ffmpeg") ~autostart ~self_sync ~poll_delay ~debug
method remaining = -1
method abort_track = Generator.add_track_mark self#buffer
method private is_connected = Atomic.get container <> None
method! is_ready = super#is_ready && self#is_connected
method! is_ready ?frame () = super#is_ready ?frame () && self#is_connected

method private get_self_sync =
match self_sync () with Some v -> v | None -> false
Expand Down
2 changes: 1 addition & 1 deletion src/core/io/gstreamer_io.ml
Original file line number Diff line number Diff line change
Expand Up @@ -487,7 +487,7 @@ class audio_video_input p (pipeline, audio_pipeline, video_pipeline) =
(* Source is ready when ready = true and gst has some audio or some video. *)
val mutable ready = true

method is_ready =
method private _is_ready =
let pending = function
| Some sink -> sink.pending () > 0
| None -> false
Expand Down
4 changes: 2 additions & 2 deletions src/core/io/srt_io.ml
Original file line number Diff line number Diff line change
Expand Up @@ -681,8 +681,8 @@ class virtual input_base ~max ~clock_safe ~on_connect ~on_disconnect
method remaining = -1
method abort_track = Generator.add_track_mark self#buffer

method! is_ready =
super#is_ready && (not self#should_stop) && self#is_connected
method! is_ready ?frame () =
super#is_ready ?frame () && (not self#should_stop) && self#is_connected

method self_sync = (`Dynamic, self#is_connected)

Expand Down
2 changes: 1 addition & 1 deletion src/core/lang_source.ml
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ let source_methods =
"Indicate if a source is ready to stream. This does not mean that the \
source is currently streaming, just that its resources are all properly \
initialized.",
fun s -> val_fun [] (fun _ -> bool s#is_ready) );
fun (s : Source.source) -> val_fun [] (fun _ -> bool (s#is_ready ())) );
( "buffered",
([], fun_t [] (list_t (product_t string_t float_t))),
"Length of buffered data.",
Expand Down
6 changes: 3 additions & 3 deletions src/core/operators/accelerate.ml
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ class accelerate ~ratio ~randomize source_val =
source#get_ready [(self :> source)]

method! private sleep = source#leave (self :> source)
method is_ready = source#is_ready
method private _is_ready = source#is_ready

(** Filled ticks. *)
val mutable filled = 0
Expand Down Expand Up @@ -75,13 +75,13 @@ class accelerate ~ratio ~randomize source_val =
let pos = ref 1 in
(* Drop frames if we are late. *)
(* TODO: we could also duplicate if we are in advance. *)
while !pos > 0 && self#must_drop && source#is_ready do
while !pos > 0 && self#must_drop && source#is_ready ~frame () do
Frame.clear null;
self#child_on_output (fun () -> source#get null);
pos := Frame.position null;
skipped <- skipped + !pos
done;
if source#is_ready then (
if source#is_ready ~frame () then (
let before = Frame.position frame in
if !pos > 0 then self#child_on_output (fun () -> source#get frame);
let after = Frame.position frame in
Expand Down
16 changes: 10 additions & 6 deletions src/core/operators/add.ml
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,13 @@ class virtual base ~name tracks =
List.fold_left f (-1)
(List.map
(fun s -> s#remaining)
(List.filter (fun s -> s#is_ready) sources))
(List.filter (fun (s : Source.source) -> s#is_ready ()) sources))

method abort_track = List.iter (fun s -> s#abort_track) sources
method is_ready = List.exists (fun s -> s#is_ready) sources

method private _is_ready ?frame () =
List.exists (fun s -> s#is_ready ?frame ()) sources

method seek n = match sources with [s] -> s#seek n | _ -> 0

method seek_source =
Expand Down Expand Up @@ -89,7 +92,7 @@ class virtual base ~name tracks =
match
List.fold_left
(fun cur { fields; source } ->
if not source#is_ready then cur
if not (source#is_ready ~frame:buf ()) then cur
else
List.fold_left
(fun cur { position; _ } ->
Expand Down Expand Up @@ -136,8 +139,8 @@ class audio_add ~renorm ~power ~field tracks =
(0., []) fields
in
( total_weight +. source_weight,
if source#is_ready then { source; fields } :: tracks else tracks
))
if source#is_ready ~frame:buf () then { source; fields } :: tracks
else tracks ))
(0., []) tracks
in
let total_weight = if power then sqrt total_weight else total_weight in
Expand Down Expand Up @@ -171,7 +174,8 @@ class video_add ~field ~add tracks =
let tracks =
List.fold_left
(fun tracks track ->
if track.source#is_ready then track :: tracks else tracks)
if track.source#is_ready ~frame:buf () then track :: tracks
else tracks)
[] tracks
in
let offset = Frame.position buf in
Expand Down
2 changes: 1 addition & 1 deletion src/core/operators/amplify.ml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ class amplify ~field (source : source) override_field coeff =
inherit operator ~name:"track.audio.amplify" [source]
val mutable override = None
method stype = source#stype
method is_ready = source#is_ready
method private _is_ready = source#is_ready
method remaining = source#remaining
method abort_track = source#abort_track
method seek = source#seek
Expand Down
4 changes: 2 additions & 2 deletions src/core/operators/available.ml
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,9 @@ class available ~track_sensitive ~override p (source : source) =
method self_sync = source#self_sync
val mutable ready = p ()

method is_ready =
method private _is_ready ?frame () =
if not (track_sensitive () && ready) then ready <- p ();
ready && (override || source#is_ready)
ready && (override || source#is_ready ?frame ())

method private get_frame buf =
source#get buf;
Expand Down
2 changes: 1 addition & 1 deletion src/core/operators/biquad_filter.ml
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ class biquad (source : source) filter_type freq q gain =
method seek = source#seek
method seek_source = source
method self_sync = source#self_sync
method is_ready = source#is_ready
method private _is_ready = source#is_ready
method abort_track = source#abort_track

method! wake_up a =
Expand Down
2 changes: 1 addition & 1 deletion src/core/operators/chord.ml
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ class chord metadata_name (source : source) =
inherit operator ~name:"chord" [source]
method stype = source#stype
method remaining = source#remaining
method is_ready = source#is_ready
method private _is_ready = source#is_ready
method abort_track = source#abort_track
method seek = source#seek
method seek_source = source
Expand Down
2 changes: 1 addition & 1 deletion src/core/operators/clip.ml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ class clip ~field (source : source) =
method remaining = source#remaining
method seek = source#seek
method seek_source = source
method is_ready = source#is_ready
method private _is_ready = source#is_ready
method abort_track = source#abort_track
method self_sync = source#self_sync

Expand Down
2 changes: 1 addition & 1 deletion src/core/operators/comb.ml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class comb ~field (source : source) delay feedback =
method seek = source#seek
method seek_source = source
method self_sync = source#self_sync
method is_ready = source#is_ready
method private _is_ready = source#is_ready
method abort_track = source#abort_track
val mutable past = Audio.make 0 0 0.

Expand Down
2 changes: 1 addition & 1 deletion src/core/operators/compand.ml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ class compand ~field (source : source) mu =
method seek = source#seek
method seek_source = source
method self_sync = source#self_sync
method is_ready = source#is_ready
method private _is_ready = source#is_ready
method abort_track = source#abort_track

method private get_frame buf =
Expand Down
2 changes: 1 addition & 1 deletion src/core/operators/compress.ml
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ class compress ~attack ~release ~threshold ~ratio ~knee ~track_sensitive
method seek = source#seek
method seek_source = source
method self_sync = source#self_sync
method is_ready = source#is_ready
method private _is_ready = source#is_ready
method abort_track = source#abort_track

(* Current gain in dB. *)
Expand Down
2 changes: 1 addition & 1 deletion src/core/operators/compress_exp.ml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ class compress ~field (source : source) mu =
inherit operator ~name:"compress" [source]
method stype = source#stype
method remaining = source#remaining
method is_ready = source#is_ready
method private _is_ready = source#is_ready
method abort_track = source#abort_track
method seek = source#seek
method seek_source = source
Expand Down
25 changes: 15 additions & 10 deletions src/core/operators/cross.ml
Original file line number Diff line number Diff line change
Expand Up @@ -205,16 +205,17 @@ class cross val_source ~duration_getter ~override_duration ~persist_override
| `Limit ->
(* The track finished.
* We compute rms_after and launch the transition. *)
if source#is_ready then self#analyze_after;
if source#is_ready ~frame () then self#analyze_after;
self#create_transition;

(* Check if the new source is ready *)
if (Option.get transition_source)#is_ready then self#get_frame frame
if (Option.get transition_source)#is_ready ~frame () then
self#get_frame frame
else
(* If not, finish this track, which requires our callers
* to wait that we become ready again. *)
Frame.add_break frame (Frame.position frame)
| `After when (Option.get transition_source)#is_ready ->
| `After when (Option.get transition_source)#is_ready ~frame () ->
self#child_get (Option.get transition_source) frame;

if Generator.length pending_after = 0 && Frame.is_partial frame then (
Expand All @@ -225,7 +226,7 @@ class cross val_source ~duration_getter ~override_duration ~persist_override
* using it. Each call to [get_frame] must add exactly one break so
* call it again and then remove the intermediate break that was just
* added. *)
if source#is_ready then (
if source#is_ready ~frame () then (
self#get_frame frame;
Frame.set_breaks frame
(match List.rev (Frame.breaks frame) with
Expand Down Expand Up @@ -294,8 +295,10 @@ class cross val_source ~duration_getter ~override_duration ~persist_override
rmsi_after <- rmsi_after + len);
self#save_last_metadata `After buf_frame;
self#update_cross_length buf_frame start;
if AFrame.is_partial buf_frame && not source#is_ready then
Generator.add_track_mark gen_after
if
AFrame.is_partial buf_frame
&& not (source#is_ready ~frame:buf_frame ())
then Generator.add_track_mark gen_after
else (
if not (Frame.is_partial buf_frame) then Frame.clear buf_frame;
if after_len < before_len then f ())
Expand Down Expand Up @@ -396,14 +399,16 @@ class cross val_source ~duration_getter ~override_duration ~persist_override
| `After -> Option.get transition_source
| _ -> (self :> Source.source)

method is_ready =
method private _is_ready ?frame () =
match status with
| `Idle | `Before -> source#is_ready
| `Idle | `Before -> source#is_ready ?frame ()
| `Limit -> true
| `After -> (Option.get transition_source)#is_ready || source#is_ready
| `After ->
(Option.get transition_source)#is_ready ?frame ()
|| source#is_ready ?frame ()

method abort_track =
if status = `After && (Option.get transition_source)#is_ready then
if status = `After && (Option.get transition_source)#is_ready () then
(Option.get transition_source)#abort_track
else source#abort_track
end
Expand Down
2 changes: 1 addition & 1 deletion src/core/operators/cuepoint.ml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class cue_cut ~m_cue_in ~m_cue_out ~on_cue_in ~on_cue_out source_val =
inherit! Child_support.base ~check_self_sync:true [source_val]
val mutable track_state : state = `Idle
method stype = source#stype
method is_ready = source#is_ready
method private _is_ready = source#is_ready
method abort_track = source#abort_track
method self_sync = source#self_sync
method seek = source#seek
Expand Down
8 changes: 5 additions & 3 deletions src/core/operators/defer.ml
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ class defer ~delay ~overhead ~field source =
let tmp_frame = self#tmp_frame in
Frame.clear tmp_frame;

while Frame.is_partial tmp_frame && source#is_ready do
while Frame.is_partial tmp_frame && source#is_ready ~frame:tmp_frame () do
source#get tmp_frame
done;
let gen_len = Frame.position tmp_frame in
Expand Down Expand Up @@ -138,7 +138,7 @@ class defer ~delay ~overhead ~field source =
method private queue_output =
let clock = Source.Clock_variables.get self#clock in
clock#on_output (fun () ->
if source#is_ready then self#buffer_data;
if source#is_ready () then self#buffer_data;
if should_queue then
clock#on_before_output (fun () -> self#queue_output))

Expand All @@ -148,7 +148,9 @@ class defer ~delay ~overhead ~field source =
self#queue_output);
self#on_sleep (fun () -> should_queue <- false)

method is_ready = (not deferred) && Generator.length self#generator > 0
method private _is_ready ?frame:_ _ =
(not deferred) && Generator.length self#generator > 0

method private get_frame buf = Generator.fill self#generator buf
end

Expand Down
4 changes: 2 additions & 2 deletions src/core/operators/delay.ml
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ class delay ~initial (source : source) delay =
in_track <- false;
last <- Unix.time ()

method is_ready =
let is_ready = source#is_ready in
method private _is_ready ?frame () =
let is_ready = source#is_ready ?frame () in
if in_track && not is_ready then self#end_track;
self#delay_ok && is_ready

Expand Down
2 changes: 1 addition & 1 deletion src/core/operators/delay_line.ml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ class delay (source : source) duration =
inherit operator ~name:"amplify" [source] as super
val mutable override = None
method stype = source#stype
method is_ready = source#is_ready
method private _is_ready = source#is_ready
method remaining = source#remaining
method abort_track = source#abort_track
method seek = source#seek
Expand Down
Loading

0 comments on commit 373373c

Please sign in to comment.