Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Resampled buffer #131

Merged
merged 10 commits into from
Nov 19, 2013
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ operators = \
operators/dyn_op.ml operators/video_effects.ml operators/video_fade.ml \
operators/noblank.ml operators/compand.ml \
operators/mixing_table.ml operators/prepend.ml \
operators/midi_routing.ml \
operators/midi_routing.ml operators/sleeper.ml \
operators/time_warp.ml operators/resample.ml \
operators/chord.ml operators/video_text.ml operators/rms_op.ml

Expand Down
69 changes: 69 additions & 0 deletions src/operators/sleeper.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
(*****************************************************************************

Liquidsoap, a programmable audio stream generator.
Copyright 2003-2013 Savonet team

This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2 of the License, or
(at your option) any later version.

This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details, fully stated in the COPYING
file at the root of the liquidsoap distribution.

You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA

*****************************************************************************)

open Source

class map ~kind source delay random die =
let dt = AFrame.duration () in
object (self)
inherit operator kind [source] as super

val mutable lived = 0.

method stype = source#stype
method remaining = source#remaining
method is_ready = source#is_ready
method abort_track = source#abort_track

method private get_frame buf =
source#get buf;
let delay = delay +. Random.float random in
Thread.delay delay;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like this, it's a nice way to test buffers!

lived <- lived +. max dt delay;
if die >= 0. && lived >= die then
while true do Thread.delay 60. done
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry to insist but this should be using is_ready = false ^^

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unless it's the expected thing, i.e. to block the whole streaming thread..

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it's the expected thing: the operator is pretending to not be able to stream fast enough. If you become not ready, it's an end of track, not just a slow stream.

end

let () =
let k = Lang.kind_type_of_kind_format ~fresh:1 Lang.any_fixed in
Lang.add_operator "sleeper"
[
"delay", Lang.float_t, Some (Lang.float 1.),
Some "Amount of time to sleep at each frame, the unit being the frame length.";
"random", Lang.float_t, Some (Lang.float 0.),
Some "Maximal random amount of time added (unit is frame length).";
"die", Lang.float_t, Some (Lang.float (-1.)),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of "die", "stall" or "freeze" seem more appropriate.

Some "Die after given amount of time (don't die if negative).";
"", Lang.source_t k, None, None
]
~kind:(Lang.Unconstrained k)
~descr:"Sleep at each frame. Useful for emulating network delays, etc."
~category:Lang.SoundProcessing
~flags:[Lang.Hidden; Lang.Experimental]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure it should be hidden. Perhaps we need a new Debug category..

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, I really don't want any of our users to end up trying this...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the purpose of this operator is to test the adaptative buffer, it would be good to add a test to our test suite.

(fun p kind ->
let delay = Lang.to_float (List.assoc "delay" p) in
let delay = AFrame.duration () *. delay in
let random = Lang.to_float (List.assoc "random" p) in
let random = AFrame.duration () *. random in
let die = Lang.to_float (List.assoc "die" p) in
let src = Lang.to_source (List.assoc "" p) in
new map ~kind src delay random die)
212 changes: 212 additions & 0 deletions src/operators/time_warp.ml
Original file line number Diff line number Diff line change
Expand Up @@ -160,3 +160,215 @@ let () =
Buffer.create
~infallible ~autostart ~on_start ~on_stop
~pre_buffer ~max_buffer ~kind s)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this be in its own adaptative_buffer.ml ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In my opinion, it's better to keep this in the same file as Buffer, so that we can start thinking how we could share code between the two on a long lonely winter night.

module AdaptativeBuffer =
struct
module RB = Audio.Ringbuffer
module MG = Generator.Metadata

(* The kind of value shared by a producer and a consumer. *)
(* TODO: also have breaks and metadata as in generators. *)
type control = {
lock : Mutex.t;
rb : RB.t;
mutable rb_length : float; (* average length of the ringbuffer in samples *)
mg : MG.t;
mutable buffering : bool;
mutable abort : bool;
}

let proceed control f =
Tutils.mutexify control.lock f ()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice!


(** The source which produces data by reading the buffer. *)
class producer ~kind ~pre_buffer ~averaging source c =
let channels = (Frame.type_of_kind kind).Frame.audio in
let prebuf = float (Frame.audio_of_seconds pre_buffer) in
(* see get_frame for an explanation *)
let alpha = log 2. *. AFrame.duration () /. averaging in
object (self)
inherit Source.source kind ~name:"buffer.adaptative_producer"

method stype = Source.Fallible

method remaining =
proceed c (fun () -> MG.remaining c.mg)

method is_ready =
proceed c (fun () -> not c.buffering)

method private get_frame frame =
proceed c
(fun () ->
assert (not c.buffering);

(* Update the average length of the ringbuffer (with a damping
coefficient in order not to be too sensitive to quick local
variations).*)
(* y: average length, dt: frame duration, x: read length, A=a/dt

y(t+dt)=(1-a)y(t)+ax(t)
y'(t)=(a/dt)(x(t)-y(t))
y'(t)+Ay(t)=Ax(t)

When x=x0 is constant and we start at y0, the solution is
y(t) = (y0-x0)exp(-At)+x0

half-life is at th = ln(2)/A
we should thus choose alpha = (dt * ln 2)/th
*)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

:trollface:

c.rb_length <- (1. -. alpha) *. c.rb_length +. alpha *. float (RB.read_space c.rb);

(* Fill dlen samples of dst using slen samples of the ringbuffer. *)
let fill dst dofs dlen slen =
let slen = min slen (RB.read_space c.rb) in
if slen > 0 then
let src = Audio.create channels slen in
RB.read c.rb src 0 slen;
if slen = dlen then
Audio.blit src 0 dst dofs slen
else
(* TODO: we could do better than nearest interpolation. However,
for slight adaptations the difference should not really be
audible. *)
for c = 0 to channels - 1 do
let srcc = src.(c) in
let dstc = dst.(c) in
for i = 0 to dlen - 1 do
let x = srcc.(i * slen / dlen) in
dstc.(i + dofs) <- x
done
done
in

let ofs = Frame.position frame in
(* Yes, I've seen some cases... *)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does this comment mean?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I used the following script to test new tracks:

s = playlist(id="playlist","~/Music")
s = skip_blank(max_blank=5.,threshold=0.,s)
s = sleeper(delay=1.2,s)
s = mksafe(s)
s = output.dummy(s)
s = buffer.adaptative(averaging=1.,s)
s = mksafe(s)
out(s)

And after a few tracks, I had requests to fill frames with negative Frame.position, which I don't think is really related to our operator... So I just worked around...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If Frame.position returns a negative number, it means that someone has added a break at a negative position in that frame. I find it hard to bet that it isn't related to the new operator and generator. In any case, this really needs to be properly investigated.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed...

let ofs = max 0 ofs in
let len =
let len = Lazy.force Frame.size - ofs in
let rem = MG.remaining c.mg in
if rem = -1 then len else min len rem
in
let aofs = Frame.audio_of_master ofs in
let alen = Frame.audio_of_master len in
let buf = AFrame.content_of_type ~channels frame aofs in

(* We scale the reading so that the buffer always approximatively
contains prebuf data. *)
let scaling = c.rb_length /. prebuf in
let scale n = int_of_float (float n *. scaling) in
let salen = scale alen in
fill buf aofs alen salen;
Frame.add_break frame len;

(* Fill in metadata *)
let md = MG.metadata c.mg len in
List.iter (fun (t,m) -> Frame.set_metadata frame (scale t) m) md;
MG.advance c.mg len;
if Frame.is_partial frame then MG.drop_initial_break c.mg;

(* If we should play at 10x we declare that we should buffer again. *)
if RB.read_space c.rb = 0 || scaling < 0.1 then begin
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should scaling be a parameter? (not sure, just asking..)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry I meant the cut-off value 0.1

self#log#f 3 "Buffer emptied, start buffering...";
c.buffering <- true
end)

method abort_track =
proceed c (fun () -> c.abort <- true)
end
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I follow where abort and buffering are used..?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nevermind found it ^^


class consumer
~autostart ~infallible ~on_start ~on_stop ~pre_buffer
~kind source_val c
=
let channels = (Frame.type_of_kind kind).Frame.audio in
let prebuf = Frame.audio_of_seconds pre_buffer in
object (self)
inherit Output.output
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could probably inherit from output.dummy here and get rid of the output_* methods..

~output_kind:"buffer" ~content_kind:kind
~infallible ~on_start ~on_stop
source_val
autostart

method output_reset = ()
method output_start = ()
method output_stop = ()

val source = Lang.to_source source_val
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here, move this to the operator..


method output_send frame =
proceed c
(fun () ->
if c.abort then begin
c.abort <- false;
source#abort_track
end;
let len = AFrame.position frame in
(* TODO: is this ok to start from 0? *)
let buf = AFrame.content_of_type ~channels frame 0 in
if RB.write_space c.rb < len then RB.read_advance c.rb (len - RB.write_space c.rb);
RB.write c.rb buf 0 len;
MG.feed_from_frame c.mg frame;
if RB.read_space c.rb > prebuf then c.buffering <- false
)

end

let create ~autostart ~infallible ~on_start ~on_stop
~pre_buffer ~max_buffer ~averaging ~kind source_val =
let channels = (Frame.type_of_kind kind).Frame.audio in
let control =
{
lock = Mutex.create ();
rb = RB.create channels (Frame.audio_of_seconds max_buffer);
rb_length = float (Frame.audio_of_seconds pre_buffer);
mg = MG.create ();
buffering = true;
abort = false;
}
in
let source = Lang.to_source source_val in
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should do that in the operator's parameters' processing I think..

let _ =
new consumer
~autostart ~infallible ~on_start ~on_stop
~kind source_val ~pre_buffer control
in
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the consumer will never be used out of the producer's scope, then you could also move its creation in the producer's constructor..

new producer ~kind ~pre_buffer ~averaging source control
end

let () =
let k = Lang.kind_type_of_kind_format ~fresh:1 Lang.audio_any in
Lang.add_operator "buffer.adaptative"
(Output.proto @
["buffer", Lang.float_t, Some (Lang.float 1.),
Some "Amount of data to pre-buffer, in seconds.";
"max", Lang.float_t, Some (Lang.float 10.),
Some "Maximum amount of buffered data, in seconds.";
"averaging", Lang.float_t, Some (Lang.float 30.),
Some "Half-life for the averaging of the buffer size, in seconds.";
"", Lang.source_t k, None, None])
~kind:(Lang.Unconstrained k)
~category:Lang.Liquidsoap
~descr:"Create a buffer between two different clocks. The speed of \
the output is adapted so that no buffer underrun or overrun \
occurs. This wonderful behavior has a cost: the pitch of the \
sound might be changed a little."
~flags:[Lang.Experimental]
(fun p kind ->
let infallible = not (Lang.to_bool (List.assoc "fallible" p)) in
let autostart = Lang.to_bool (List.assoc "start" p) in
let on_start = List.assoc "on_start" p in
let on_stop = List.assoc "on_stop" p in
let on_start () = ignore (Lang.apply ~t:Lang.unit_t on_start []) in
let on_stop () = ignore (Lang.apply ~t:Lang.unit_t on_stop []) in
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure you need all these options, they are usually defined on outputs..

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean all these callbacks.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The consumer side of a buffer is an output, so it's natural to have output callbacks. It is forced by the code, and I don't see much harm to expose them to users. (Though I've never used them.)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah no I agree.

let s = List.assoc "" p in
let pre_buffer = Lang.to_float (List.assoc "buffer" p) in
let max_buffer = Lang.to_float (List.assoc "max" p) in
let averaging = Lang.to_float (List.assoc "averaging" p) in
if pre_buffer >= max_buffer then
raise (Lang.Invalid_value
(List.assoc "max" p,
"Maximum buffering inferior to pre-buffering"));
AdaptativeBuffer.create
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Matter of style: you could move all the instantiations to the producer class and call new adaptative_buffer directly here like done in the other cases...

~infallible ~autostart ~on_start ~on_stop
~pre_buffer ~max_buffer ~averaging ~kind s)
90 changes: 90 additions & 0 deletions src/stream/generator.ml
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,96 @@ let get g size =

end

(* TODO: use this in the following modules instead of copying the code... *)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please don't introduce a TODO in one of our core files... Also, I'm not in favor of a rework of Generator (using Metadata as part of From_frames) now, because I'm not sure to have the time to review this carefully enough, and we don't have the time to test it enough either, with an upcoming release.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I'm kinda weirded out by the amount of code as well.. It seems to me that there should be better path of least resistance..

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, I think that this TODO is really deserved:

  • the code for handling metadata is duplicated in all the generators, which is really bad
  • as you remark, we do not want to do this now, in order not to mess up with generators which are a very sensitive part of Liq, so the TODO will help us remember

module Metadata = struct
type t =
{
mutable metadata : (int * Frame.metadata) list;
mutable breaks : int list;
mutable length : int;
}

let create () =
{
metadata = [];
breaks = [];
length = 0;
}

let clear g =
g.metadata <- [];
g.breaks <- [];
g.length <- 0

let advance g len =
g.metadata <- List.map (fun (t,m) -> t-len, m) g.metadata;
g.metadata <- List.filter (fun (t,_) -> t >= 0) g.metadata;
g.breaks <- List.map (fun t -> t-len) g.breaks;
g.breaks <- List.filter (fun t -> t >= 0) g.breaks;
g.length <- g.length - len

let length g = g.length

let remaining g =
match g.breaks with
| a::_ -> a
| _ -> -1

let metadata g len =
List.filter (fun (t,m) -> t < len) g.metadata

let feed_from_frame g frame =
let size = Lazy.force Frame.size in
let length = length g in
g.metadata <-
g.metadata @
(List.map
(fun (t,m) -> length+t, m)
(Frame.get_all_metadata frame));
g.breaks <-
g.breaks @
(List.map
(fun t -> length+t)
(* Filter out the last break, which only marks the end of frame, not a
* track limit (doesn't mean is_partial). *)
(List.filter (fun x -> x < size) (Frame.breaks frame)));
let frame_length =
let rec aux = function
| [t] -> t
| _::tl -> aux tl
| [] -> size
in
aux (Frame.breaks frame)
in
g.length <- g.length + frame_length

let drop_initial_break g =
match g.breaks with
| 0::tl -> g.breaks <- tl
| [] -> () (* end of stream / underrun... *)
| _ -> assert false

let fill g frame =
let offset = Frame.position frame in
let needed =
let size = Lazy.force Frame.size in
let remaining = remaining g in
let remaining = if remaining = -1 then length g else remaining in
min (size-offset) remaining
in
List.iter
(fun (p,m) ->
if p < needed then Frame.set_metadata frame (offset+p) m
) g.metadata;
advance g needed;
(* Mark the end of this filling. If the frame is partial it must be because
* of a break in the generator, or because the generator is emptying.
* Conversely, each break in the generator must cause a partial frame, so
* don't remove any if it isn't partial. *)
Frame.add_break frame (offset+needed);
if Frame.is_partial frame then drop_initial_break g
end

(** Generate a stream, including metadata and breaks.
* The API is based on feeding from frames, and filling frames. *)
module From_frames =
Expand Down
Loading