-
-
Notifications
You must be signed in to change notification settings - Fork 128
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Resampled buffer #131
Resampled buffer #131
Changes from 8 commits
debd8ce
03a309c
41cf85e
70641f7
e8cd8d7
574cd83
3cfb3cb
bf0fb8c
968d615
3903f1f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,69 @@ | ||
(***************************************************************************** | ||
|
||
Liquidsoap, a programmable audio stream generator. | ||
Copyright 2003-2013 Savonet team | ||
|
||
This program is free software; you can redistribute it and/or modify | ||
it under the terms of the GNU General Public License as published by | ||
the Free Software Foundation; either version 2 of the License, or | ||
(at your option) any later version. | ||
|
||
This program is distributed in the hope that it will be useful, | ||
but WITHOUT ANY WARRANTY; without even the implied warranty of | ||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||
GNU General Public License for more details, fully stated in the COPYING | ||
file at the root of the liquidsoap distribution. | ||
|
||
You should have received a copy of the GNU General Public License | ||
along with this program; if not, write to the Free Software | ||
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA | ||
|
||
*****************************************************************************) | ||
|
||
open Source | ||
|
||
class map ~kind source delay random die = | ||
let dt = AFrame.duration () in | ||
object (self) | ||
inherit operator kind [source] as super | ||
|
||
val mutable lived = 0. | ||
|
||
method stype = source#stype | ||
method remaining = source#remaining | ||
method is_ready = source#is_ready | ||
method abort_track = source#abort_track | ||
|
||
method private get_frame buf = | ||
source#get buf; | ||
let delay = delay +. Random.float random in | ||
Thread.delay delay; | ||
lived <- lived +. max dt delay; | ||
if die >= 0. && lived >= die then | ||
while true do Thread.delay 60. done | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sorry to insist but this should be using There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Unless it's the expected thing, i.e. to block the whole streaming thread.. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, it's the expected thing: the operator is pretending to not be able to stream fast enough. If you become not ready, it's an end of track, not just a slow stream. |
||
end | ||
|
||
let () = | ||
let k = Lang.kind_type_of_kind_format ~fresh:1 Lang.any_fixed in | ||
Lang.add_operator "sleeper" | ||
[ | ||
"delay", Lang.float_t, Some (Lang.float 1.), | ||
Some "Amount of time to sleep at each frame, the unit being the frame length."; | ||
"random", Lang.float_t, Some (Lang.float 0.), | ||
Some "Maximal random amount of time added (unit is frame length)."; | ||
"die", Lang.float_t, Some (Lang.float (-1.)), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Instead of "die", "stall" or "freeze" seem more appropriate. |
||
Some "Die after given amount of time (don't die if negative)."; | ||
"", Lang.source_t k, None, None | ||
] | ||
~kind:(Lang.Unconstrained k) | ||
~descr:"Sleep at each frame. Useful for emulating network delays, etc." | ||
~category:Lang.SoundProcessing | ||
~flags:[Lang.Hidden; Lang.Experimental] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not sure it should be hidden. Perhaps we need a new There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Well, I really don't want any of our users to end up trying this... There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Since the purpose of this operator is to test the adaptative buffer, it would be good to add a test to our test suite. |
||
(fun p kind -> | ||
let delay = Lang.to_float (List.assoc "delay" p) in | ||
let delay = AFrame.duration () *. delay in | ||
let random = Lang.to_float (List.assoc "random" p) in | ||
let random = AFrame.duration () *. random in | ||
let die = Lang.to_float (List.assoc "die" p) in | ||
let src = Lang.to_source (List.assoc "" p) in | ||
new map ~kind src delay random die) |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -160,3 +160,215 @@ let () = | |
Buffer.create | ||
~infallible ~autostart ~on_start ~on_stop | ||
~pre_buffer ~max_buffer ~kind s) | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Shouldn't this be in its own There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In my opinion, it's better to keep this in the same file as Buffer, so that we can start thinking how we could share code between the two on a long lonely winter night. |
||
module AdaptativeBuffer = | ||
struct | ||
module RB = Audio.Ringbuffer | ||
module MG = Generator.Metadata | ||
|
||
(* The kind of value shared by a producer and a consumer. *) | ||
(* TODO: also have breaks and metadata as in generators. *) | ||
type control = { | ||
lock : Mutex.t; | ||
rb : RB.t; | ||
mutable rb_length : float; (* average length of the ringbuffer in samples *) | ||
mg : MG.t; | ||
mutable buffering : bool; | ||
mutable abort : bool; | ||
} | ||
|
||
let proceed control f = | ||
Tutils.mutexify control.lock f () | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nice! |
||
|
||
(** The source which produces data by reading the buffer. *) | ||
class producer ~kind ~pre_buffer ~averaging source c = | ||
let channels = (Frame.type_of_kind kind).Frame.audio in | ||
let prebuf = float (Frame.audio_of_seconds pre_buffer) in | ||
(* see get_frame for an explanation *) | ||
let alpha = log 2. *. AFrame.duration () /. averaging in | ||
object (self) | ||
inherit Source.source kind ~name:"buffer.adaptative_producer" | ||
|
||
method stype = Source.Fallible | ||
|
||
method remaining = | ||
proceed c (fun () -> MG.remaining c.mg) | ||
|
||
method is_ready = | ||
proceed c (fun () -> not c.buffering) | ||
|
||
method private get_frame frame = | ||
proceed c | ||
(fun () -> | ||
assert (not c.buffering); | ||
|
||
(* Update the average length of the ringbuffer (with a damping | ||
coefficient in order not to be too sensitive to quick local | ||
variations).*) | ||
(* y: average length, dt: frame duration, x: read length, A=a/dt | ||
|
||
y(t+dt)=(1-a)y(t)+ax(t) | ||
y'(t)=(a/dt)(x(t)-y(t)) | ||
y'(t)+Ay(t)=Ax(t) | ||
|
||
When x=x0 is constant and we start at y0, the solution is | ||
y(t) = (y0-x0)exp(-At)+x0 | ||
|
||
half-life is at th = ln(2)/A | ||
we should thus choose alpha = (dt * ln 2)/th | ||
*) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
c.rb_length <- (1. -. alpha) *. c.rb_length +. alpha *. float (RB.read_space c.rb); | ||
|
||
(* Fill dlen samples of dst using slen samples of the ringbuffer. *) | ||
let fill dst dofs dlen slen = | ||
let slen = min slen (RB.read_space c.rb) in | ||
if slen > 0 then | ||
let src = Audio.create channels slen in | ||
RB.read c.rb src 0 slen; | ||
if slen = dlen then | ||
Audio.blit src 0 dst dofs slen | ||
else | ||
(* TODO: we could do better than nearest interpolation. However, | ||
for slight adaptations the difference should not really be | ||
audible. *) | ||
for c = 0 to channels - 1 do | ||
let srcc = src.(c) in | ||
let dstc = dst.(c) in | ||
for i = 0 to dlen - 1 do | ||
let x = srcc.(i * slen / dlen) in | ||
dstc.(i + dofs) <- x | ||
done | ||
done | ||
in | ||
|
||
let ofs = Frame.position frame in | ||
(* Yes, I've seen some cases... *) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What does this comment mean? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I used the following script to test new tracks:
And after a few tracks, I had requests to fill frames with negative There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If Frame.position returns a negative number, it means that someone has added a break at a negative position in that frame. I find it hard to bet that it isn't related to the new operator and generator. In any case, this really needs to be properly investigated. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Agreed... |
||
let ofs = max 0 ofs in | ||
let len = | ||
let len = Lazy.force Frame.size - ofs in | ||
let rem = MG.remaining c.mg in | ||
if rem = -1 then len else min len rem | ||
in | ||
let aofs = Frame.audio_of_master ofs in | ||
let alen = Frame.audio_of_master len in | ||
let buf = AFrame.content_of_type ~channels frame aofs in | ||
|
||
(* We scale the reading so that the buffer always approximatively | ||
contains prebuf data. *) | ||
let scaling = c.rb_length /. prebuf in | ||
let scale n = int_of_float (float n *. scaling) in | ||
let salen = scale alen in | ||
fill buf aofs alen salen; | ||
Frame.add_break frame len; | ||
|
||
(* Fill in metadata *) | ||
let md = MG.metadata c.mg len in | ||
List.iter (fun (t,m) -> Frame.set_metadata frame (scale t) m) md; | ||
MG.advance c.mg len; | ||
if Frame.is_partial frame then MG.drop_initial_break c.mg; | ||
|
||
(* If we should play at 10x we declare that we should buffer again. *) | ||
if RB.read_space c.rb = 0 || scaling < 0.1 then begin | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sorry I meant the cut-off value |
||
self#log#f 3 "Buffer emptied, start buffering..."; | ||
c.buffering <- true | ||
end) | ||
|
||
method abort_track = | ||
proceed c (fun () -> c.abort <- true) | ||
end | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not sure I follow where There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nevermind found it ^^ |
||
|
||
class consumer | ||
~autostart ~infallible ~on_start ~on_stop ~pre_buffer | ||
~kind source_val c | ||
= | ||
let channels = (Frame.type_of_kind kind).Frame.audio in | ||
let prebuf = Frame.audio_of_seconds pre_buffer in | ||
object (self) | ||
inherit Output.output | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You could probably inherit from |
||
~output_kind:"buffer" ~content_kind:kind | ||
~infallible ~on_start ~on_stop | ||
source_val | ||
autostart | ||
|
||
method output_reset = () | ||
method output_start = () | ||
method output_stop = () | ||
|
||
val source = Lang.to_source source_val | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same here, move this to the operator.. |
||
|
||
method output_send frame = | ||
proceed c | ||
(fun () -> | ||
if c.abort then begin | ||
c.abort <- false; | ||
source#abort_track | ||
end; | ||
let len = AFrame.position frame in | ||
(* TODO: is this ok to start from 0? *) | ||
let buf = AFrame.content_of_type ~channels frame 0 in | ||
if RB.write_space c.rb < len then RB.read_advance c.rb (len - RB.write_space c.rb); | ||
RB.write c.rb buf 0 len; | ||
MG.feed_from_frame c.mg frame; | ||
if RB.read_space c.rb > prebuf then c.buffering <- false | ||
) | ||
|
||
end | ||
|
||
let create ~autostart ~infallible ~on_start ~on_stop | ||
~pre_buffer ~max_buffer ~averaging ~kind source_val = | ||
let channels = (Frame.type_of_kind kind).Frame.audio in | ||
let control = | ||
{ | ||
lock = Mutex.create (); | ||
rb = RB.create channels (Frame.audio_of_seconds max_buffer); | ||
rb_length = float (Frame.audio_of_seconds pre_buffer); | ||
mg = MG.create (); | ||
buffering = true; | ||
abort = false; | ||
} | ||
in | ||
let source = Lang.to_source source_val in | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You should do that in the operator's parameters' processing I think.. |
||
let _ = | ||
new consumer | ||
~autostart ~infallible ~on_start ~on_stop | ||
~kind source_val ~pre_buffer control | ||
in | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If the consumer will never be used out of the producer's scope, then you could also move its creation in the producer's constructor.. |
||
new producer ~kind ~pre_buffer ~averaging source control | ||
end | ||
|
||
let () = | ||
let k = Lang.kind_type_of_kind_format ~fresh:1 Lang.audio_any in | ||
Lang.add_operator "buffer.adaptative" | ||
(Output.proto @ | ||
["buffer", Lang.float_t, Some (Lang.float 1.), | ||
Some "Amount of data to pre-buffer, in seconds."; | ||
"max", Lang.float_t, Some (Lang.float 10.), | ||
Some "Maximum amount of buffered data, in seconds."; | ||
"averaging", Lang.float_t, Some (Lang.float 30.), | ||
Some "Half-life for the averaging of the buffer size, in seconds."; | ||
"", Lang.source_t k, None, None]) | ||
~kind:(Lang.Unconstrained k) | ||
~category:Lang.Liquidsoap | ||
~descr:"Create a buffer between two different clocks. The speed of \ | ||
the output is adapted so that no buffer underrun or overrun \ | ||
occurs. This wonderful behavior has a cost: the pitch of the \ | ||
sound might be changed a little." | ||
~flags:[Lang.Experimental] | ||
(fun p kind -> | ||
let infallible = not (Lang.to_bool (List.assoc "fallible" p)) in | ||
let autostart = Lang.to_bool (List.assoc "start" p) in | ||
let on_start = List.assoc "on_start" p in | ||
let on_stop = List.assoc "on_stop" p in | ||
let on_start () = ignore (Lang.apply ~t:Lang.unit_t on_start []) in | ||
let on_stop () = ignore (Lang.apply ~t:Lang.unit_t on_stop []) in | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not sure you need all these options, they are usually defined on outputs.. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I mean all these callbacks. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The consumer side of a buffer is an output, so it's natural to have output callbacks. It is forced by the code, and I don't see much harm to expose them to users. (Though I've never used them.) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah no I agree. |
||
let s = List.assoc "" p in | ||
let pre_buffer = Lang.to_float (List.assoc "buffer" p) in | ||
let max_buffer = Lang.to_float (List.assoc "max" p) in | ||
let averaging = Lang.to_float (List.assoc "averaging" p) in | ||
if pre_buffer >= max_buffer then | ||
raise (Lang.Invalid_value | ||
(List.assoc "max" p, | ||
"Maximum buffering inferior to pre-buffering")); | ||
AdaptativeBuffer.create | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Matter of style: you could move all the instantiations to the producer class and call |
||
~infallible ~autostart ~on_start ~on_stop | ||
~pre_buffer ~max_buffer ~averaging ~kind s) |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -168,6 +168,96 @@ let get g size = | |
|
||
end | ||
|
||
(* TODO: use this in the following modules instead of copying the code... *) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please don't introduce a TODO in one of our core files... Also, I'm not in favor of a rework of Generator (using Metadata as part of From_frames) now, because I'm not sure to have the time to review this carefully enough, and we don't have the time to test it enough either, with an upcoming release. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, I'm kinda weirded out by the amount of code as well.. It seems to me that there should be better path of least resistance.. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Well, I think that this TODO is really deserved:
|
||
module Metadata = struct | ||
type t = | ||
{ | ||
mutable metadata : (int * Frame.metadata) list; | ||
mutable breaks : int list; | ||
mutable length : int; | ||
} | ||
|
||
let create () = | ||
{ | ||
metadata = []; | ||
breaks = []; | ||
length = 0; | ||
} | ||
|
||
let clear g = | ||
g.metadata <- []; | ||
g.breaks <- []; | ||
g.length <- 0 | ||
|
||
let advance g len = | ||
g.metadata <- List.map (fun (t,m) -> t-len, m) g.metadata; | ||
g.metadata <- List.filter (fun (t,_) -> t >= 0) g.metadata; | ||
g.breaks <- List.map (fun t -> t-len) g.breaks; | ||
g.breaks <- List.filter (fun t -> t >= 0) g.breaks; | ||
g.length <- g.length - len | ||
|
||
let length g = g.length | ||
|
||
let remaining g = | ||
match g.breaks with | ||
| a::_ -> a | ||
| _ -> -1 | ||
|
||
let metadata g len = | ||
List.filter (fun (t,m) -> t < len) g.metadata | ||
|
||
let feed_from_frame g frame = | ||
let size = Lazy.force Frame.size in | ||
let length = length g in | ||
g.metadata <- | ||
g.metadata @ | ||
(List.map | ||
(fun (t,m) -> length+t, m) | ||
(Frame.get_all_metadata frame)); | ||
g.breaks <- | ||
g.breaks @ | ||
(List.map | ||
(fun t -> length+t) | ||
(* Filter out the last break, which only marks the end of frame, not a | ||
* track limit (doesn't mean is_partial). *) | ||
(List.filter (fun x -> x < size) (Frame.breaks frame))); | ||
let frame_length = | ||
let rec aux = function | ||
| [t] -> t | ||
| _::tl -> aux tl | ||
| [] -> size | ||
in | ||
aux (Frame.breaks frame) | ||
in | ||
g.length <- g.length + frame_length | ||
|
||
let drop_initial_break g = | ||
match g.breaks with | ||
| 0::tl -> g.breaks <- tl | ||
| [] -> () (* end of stream / underrun... *) | ||
| _ -> assert false | ||
|
||
let fill g frame = | ||
let offset = Frame.position frame in | ||
let needed = | ||
let size = Lazy.force Frame.size in | ||
let remaining = remaining g in | ||
let remaining = if remaining = -1 then length g else remaining in | ||
min (size-offset) remaining | ||
in | ||
List.iter | ||
(fun (p,m) -> | ||
if p < needed then Frame.set_metadata frame (offset+p) m | ||
) g.metadata; | ||
advance g needed; | ||
(* Mark the end of this filling. If the frame is partial it must be because | ||
* of a break in the generator, or because the generator is emptying. | ||
* Conversely, each break in the generator must cause a partial frame, so | ||
* don't remove any if it isn't partial. *) | ||
Frame.add_break frame (offset+needed); | ||
if Frame.is_partial frame then drop_initial_break g | ||
end | ||
|
||
(** Generate a stream, including metadata and breaks. | ||
* The API is based on feeding from frames, and filling frames. *) | ||
module From_frames = | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like this, it's a nice way to test buffers!