Skip to content

Commit

Permalink
Merge pull request #90 from savonet/websockets
Browse files Browse the repository at this point in the history
Websocket support for harbor
  • Loading branch information
toots committed Jul 27, 2013
2 parents 6c281a1 + 976387e commit 5f7e3f1
Show file tree
Hide file tree
Showing 13 changed files with 968 additions and 132 deletions.
15 changes: 14 additions & 1 deletion configure.ac
Original file line number Diff line number Diff line change
Expand Up @@ -743,6 +743,20 @@ cat >> src/configure.ml <<EOCONF
let file_watcher = ref File_watcher_mtime.watch
EOCONF

# Yojson

AC_CHECK_OCAML_BINDING([yojson],[],[],[],[],[1])

if test -z "${W_YOJSON}"; then
cat >> src/configure.ml <<__BLAH__
module JSON = JSON
__BLAH__
else
cat >> src/configure.ml <<__BLAH__
module JSON = struct include Yojson.Basic let from_string s = from_string s end
__BLAH__
fi

# OCaml bindings

AC_CHECK_OCAML_BINDING([faad],[0.3.0])
Expand All @@ -754,7 +768,6 @@ AC_CHECK_OCAML_BINDING([dssi],[0.1.0])
AC_CHECK_OCAML_BINDING([sdl],[],[],[],[sdl.sdlmixer sdl.sdlttf sdl.sdlimage])
AC_CHECK_OCAML_BINDING([camlimages],[4.0.0],[],[],[camlimages.all_formats])
AC_CHECK_OCAML_BINDING([lo],[0.1.0])
AC_CHECK_OCAML_BINDING([yojson],[],[],[],[],[1])
AC_CHECK_OCAML_BINDING([gd])

# Winsvc (to run as a service)
Expand Down
12 changes: 6 additions & 6 deletions src/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ decoders = \
decoder/metadata_decoder.ml \
decoder/wav_decoder.ml decoder/midi_decoder.ml \
decoder/image_decoder.ml decoder/image/ppm_decoder.ml \
decoder/external_decoder.ml
decoder/external_decoder.ml decoder/raw_audio_decoder.ml

$(call conditional_compilation,decoders,$(W_FLAC),decoder/flac_decoder.ml)
$(call conditional_compilation,decoders,$(W_FAAD),decoder/aac_decoder.ml)
Expand Down Expand Up @@ -192,10 +192,11 @@ tools = tools/stdlib.ml tools/doc.ml tools/plug.ml tools/utils.ml \
$(if $(W_LAME_DYN),tools/lame_dynlink.ml) \
$(if $(W_FDKAAC_DYN),tools/fdkaac_dynlink.ml) \
$(if $(W_AACPLUS_DYN),tools/aacplus_dynlink.ml) \
tools/wav.ml tools/tutils.ml \
tools/wav.ml tools/tutils.ml tools/JSON.ml \
tools/file_watcher.ml tools/file_watcher_mtime.ml \
configure.ml \
tools/http.ml tools/pool.ml
tools/http.ml tools/pool.ml \
tools/sha1.ml tools/websocket.ml

$(call conditional_compilation,tools,$(W_INOTIFY),tools/file_watcher_inotify.ml)

Expand All @@ -212,8 +213,7 @@ synth = synth/keyboard.ml synth/synth_op.ml

$(call conditional_compilation,synth,$(W_DSSI),synth/dssi_op.ml)

builtins = lang/lang_builtins.ml \
$(if $(W_YOJSON),lang/builtins_json.ml)
builtins = lang/lang_builtins.ml lang/builtins_json.ml

$(call conditional_compilation,builtins,$(W_LO),lang/builtins_lo.ml)
$(call conditional_compilation,builtins,$(W_MAGIC),lang/builtins_magic.ml)
Expand Down Expand Up @@ -291,7 +291,7 @@ liquidsoap_doc_sources= \
lang/lang.ml lang/lang_types.ml lang/lang_values.ml \
tools/server.ml tools/tutils.ml playlist_parser.ml \
tools/doc.ml tools/wav.ml tools/ioRing.ml \
tools/dyntools.ml \
tools/dyntools.ml tools/JSON.ml \
converters/audio_converter.ml converters/video_converter.ml \
ogg_formats/ogg_muxer.ml \
sources/generated.ml sources/request_source.ml sources/synthesized.ml \
Expand Down
3 changes: 3 additions & 0 deletions src/configure.mli
Original file line number Diff line number Diff line change
Expand Up @@ -79,3 +79,6 @@ val libs_versions : string

(** File watch utility. *)
val file_watcher : File_watcher.watch ref

(** JSON parser. *)
module JSON : (module type of JSON)
176 changes: 176 additions & 0 deletions src/decoder/raw_audio_decoder.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
(*****************************************************************************
Liquidsoap, a programmable audio stream generator.
Copyright 2003-2013 Savonet team
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details, fully stated in the COPYING
file at the root of the liquidsoap distribution.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*****************************************************************************)

(** Decode raw data *)

open Stdlib

let log = Dtools.Log.make ["decoder";"raw"]

(** {1 Generic decoder} *)

exception End_of_stream

(* TODO: some code should be shared with wav decoder, and possibly others. *)

type format =
{
format : [ `S8 | `S16LE | `F32LE ];
channels : int;
interleaved : bool;
samplerate : float;
}

(** Bytes per sample. *)
let sample_size fmt =
match fmt.format with
| `S8 -> 1
| `S16LE -> 2
| `F32LE -> 4

module Make (Generator:Generator.S_Asio) = struct
let create ~format input =
(* TODO: can we handle interleaved with generators? I don't think so... *)
assert (format.interleaved = true);
let sample_size = sample_size format in
let channels = format.channels in
let bytes_to_get = sample_size * channels * 1024 in
let converter =
let audio_dst_rate = float (Lazy.force Frame.audio_rate) in
let ratio = audio_dst_rate /. format.samplerate in
let samplerate_converter = Audio_converter.Samplerate.create channels in
fun src ->
let len = String.length src / (sample_size * channels) in
let dst = Array.init channels (fun _ -> Array.make len 0.) in
let sample =
let pos = ref 0 in
match format.format with
| `S8 ->
fun () ->
let ans = int_of_char src.[!pos] in
let ans = if ans > 128 then ans - 256 else ans in
incr pos;
float ans /. 128.
| `S16LE ->
fun () ->
let ans = int_of_char src.[!pos] + (int_of_char src.[!pos+1] lsl 8) in
let ans = if ans > 32768 then ans - 65536 else ans in
pos := !pos + 2;
float ans /. 32768.
| `F32LE ->
(* TODO: handle endianness *)
fun () ->
let ans = ref Int32.zero in
for i = 3 downto 0 do
ans := Int32.shift_left !ans 8;
ans := Int32.add !ans (Int32.of_int (int_of_char src.[!pos + i]))
done;
pos := !pos + sample_size;
Int32.float_of_bits !ans
in
for i = 0 to len - 1 do
for c = 0 to channels - 1 do
dst.(c).(i) <- sample ()
done;
done;
let dst = Audio_converter.Samplerate.resample samplerate_converter ratio dst 0 len in
let dst_len = Array.length dst.(0) in
dst, dst_len
in
let decoder gen =
let data, bytes = input.Decoder.read bytes_to_get in
if bytes = 0 then raise End_of_stream;
let content, length = converter (String.sub data 0 bytes) in
Generator.set_mode gen `Audio;
Generator.put_audio gen content 0 length
in
(* TODO *)
let seek ticks = 0 in
{ Decoder.
decode = decoder;
seek = seek }
end

module Generator_plus = Generator.From_audio_video_plus
module Generator = Generator.From_audio_video
module Buffered = Decoder.Buffered(Generator)

(* TODO: we don't want a file decoder, do we? *)

module D_stream = Make(Generator_plus)

(* The mime types are inspired of GStreamer's convention. See
http://gstreamer.freedesktop.org/data/doc/gstreamer/head/pwg/html/section-types-definitions.html
For instance: audio/x-raw,format=F32LE,channels=2,layout=interleaved,rate=44100 *)
(* TODO: proper parser? *)
let parse_mime m =
let ans = ref { format = `F32LE; channels = 2; interleaved = true; samplerate = 44100. } in
try
let m = String.split_char ',' m in
if m = [] || List.hd m <> "audio/x-raw" then raise Exit;
let m = List.tl m in
let m =
List.map (fun lv ->
let lv = String.split_char '=' lv in
match lv with
| [l;v] -> l,v
| _ -> raise Exit
) m
in
List.iter (fun (l,v) ->
match l with
| "format" ->
let format = List.assoc v ["S8",`S8;"S16LE",`S16LE;"F32LE",`F32LE] in
ans := { !ans with format }
| "channels" ->
let channels = int_of_string v in
ans := { !ans with channels }
| "layout" ->
let interleaved =
if v = "interleaved" then true
else if v = "non-interleaved" then false
else raise Exit
in
ans := { !ans with interleaved }
| "rate" | "samplerate" ->
let samplerate = float_of_string v in
ans := { !ans with samplerate }
| _ -> failwith ("Unknown property: "^l)
) m;
Some !ans
with
| _ -> None

let () =
let (<:) a b = Frame.mul_sub_mul a b in
Decoder.stream_decoders#register
"raw audio"
~sdoc:"Decode audio/x-raw."
(fun mime kind ->
let mime = parse_mime mime in
match mime with
| Some format when
Frame.Zero <: kind.Frame.video
&& Frame.Zero <: kind.Frame.midi
&& Frame.mul_of_int format.channels <: kind.Frame.audio ->
Some (D_stream.create ~format)
| _ -> None)
2 changes: 1 addition & 1 deletion src/lang/builtins_json.ml
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ let () =
let s = Lang.to_string (List.assoc "" p) in
try
let json =
Yojson.Basic.from_string s
Configure.JSON.from_string s
in
of_json default.Lang.t json
with
Expand Down
66 changes: 34 additions & 32 deletions src/sources/harbor_input.ml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,11 @@
module Generator = Generator.From_audio_video_plus
module Generated = Generated.Make(Generator)

(* {1 Input handling} *)
(** Default function to read from a socket. *)
let default_read socket len =
let buf = String.create len in
let n = Unix.read socket buf 0 len in
buf, n

class http_input_server ~kind ~dumpfile ~logfile
~bufferize ~max ~icy ~port
Expand All @@ -42,8 +46,9 @@ object (self)
~log ~kind ~overfull:(`Drop_old max_ticks) `Undefined)
~empty_on_abort:false ~bufferize as generated

(** POSIX sucks.. *)
val mutable relay_socket = None
(** Function to read on socket. *)
val mutable relay_read = (fun socket len -> assert false)
(* Mutex used to protect socket's state (close) *)
val relay_m = Mutex.create ()
val mutable create_decoder = fun _ -> assert false
Expand Down Expand Up @@ -104,28 +109,28 @@ object (self)
self#log#f 3 "Decoding..." ;
let t0 = Unix.gettimeofday () in
let read len =
let buf,input = (fun len ->
let socket =
Tutils.mutexify relay_m (fun () ->
relay_socket) ()
in
let buf,input = (fun len ->
let socket = Tutils.mutexify relay_m (fun () -> relay_socket) () in
match socket with
| None -> "", 0
| Some socket ->
begin
try
(* Wait for `Read event on socket. *)
let log = self#log#f 4 "%s" in
Tutils.wait_for ~log `Read socket timeout;
(* Now read. *)
let buf = String.make len ' ' in
let input = Unix.read socket buf 0 len in
buf, input
with
| e -> self#log#f 2 "Error while reading from client: \
try
let rec f () =
try
(* Wait for `Read event on socket. *)
Tutils.wait_for ~log `Read socket timeout;
(* Now read. *)
relay_read socket len
with
| Harbor.Retry -> f ()
in
f ()
with
| e -> self#log#f 2 "Error while reading from client: \
%s" (Utils.error_message e);
self#disconnect ~lock:false;
"",0
self#disconnect ~lock:false;
"",0
end) len;
in
begin
Expand Down Expand Up @@ -196,32 +201,29 @@ object (self)
| Some d -> create_decoder <- d ; mime_type <- Some mime
| None -> raise Harbor.Unknown_codec

method relay stype (headers:(string*string) list) socket =
method relay stype (headers:(string*string) list) ?(read=default_read) socket =
Tutils.mutexify relay_m (fun () ->
if relay_socket <> None then
raise Harbor.Mount_taken ;
self#register_decoder stype ;
relay_socket <- Some socket) () ;
if relay_socket <> None then raise Harbor.Mount_taken;
self#register_decoder stype;
relay_socket <- Some socket;
relay_read <- read
) ();
on_connect headers ;
begin match dumpfile with
| Some f ->
begin try
dump <- Some (open_out_bin
(Utils.home_unrelate f))
dump <- Some (open_out_bin (Utils.home_unrelate f))
with e ->
self#log#f 2 "Could not open dump file: \
%s" (Utils.error_message e)
self#log#f 2 "Could not open dump file: %s" (Utils.error_message e)
end
| None -> ()
end ;
begin match logfile with
| Some f ->
begin try
logf <- Some (open_out_bin
(Utils.home_unrelate f))
logf <- Some (open_out_bin (Utils.home_unrelate f))
with e ->
self#log#f 2 "Could not open log file: \
%s" (Utils.error_message e)
self#log#f 2 "Could not open log file: %s" (Utils.error_message e)
end
| None -> ()
end ;
Expand Down
Loading

0 comments on commit 5f7e3f1

Please sign in to comment.