Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement async send. #27

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ list(APPEND CMAKE_MODULE_PATH "${CMAKE_SOURCE_DIR}/cmake/Modules/")
ADD_TEST(ev_timer ${LUA} ${CMAKE_CURRENT_SOURCE_DIR}/test/test_ev_timer.lua ${CMAKE_CURRENT_SOURCE_DIR}/test/ ${CMAKE_CURRENT_BINARY_DIR}/)
ADD_TEST(ev_idle ${LUA} ${CMAKE_CURRENT_SOURCE_DIR}/test/test_ev_idle.lua ${CMAKE_CURRENT_SOURCE_DIR}/test/ ${CMAKE_CURRENT_BINARY_DIR}/)
ADD_TEST(ev_signal ${LUA} ${CMAKE_CURRENT_SOURCE_DIR}/test/test_ev_signal.lua ${CMAKE_CURRENT_SOURCE_DIR}/test/ ${CMAKE_CURRENT_BINARY_DIR}/)
ADD_TEST(ev_async ${LUA} ${CMAKE_CURRENT_SOURCE_DIR}/test/test_ev_async.lua ${CMAKE_CURRENT_SOURCE_DIR}/test/ ${CMAKE_CURRENT_BINARY_DIR}/)
ADD_TEST(ev_child ${LUA} ${CMAKE_CURRENT_SOURCE_DIR}/test/test_ev_child.lua ${CMAKE_CURRENT_SOURCE_DIR}/test/ ${CMAKE_CURRENT_BINARY_DIR}/)
ADD_TEST(ev_stat ${LUA} ${CMAKE_CURRENT_SOURCE_DIR}/test/test_ev_stat.lua ${CMAKE_CURRENT_SOURCE_DIR}/test/ ${CMAKE_CURRENT_BINARY_DIR}/)
SET_TESTS_PROPERTIES(ev_io ev_loop ev_timer ev_signal ev_idle ev_child ev_stat
Expand Down
26 changes: 26 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,20 @@ order for it to take effect.
The on_async function will be called with these arguments (return
values are ignored):

### string = ev.Async.export(async, loop)

Export serialized send for multi-threading signalization (low-level API).

### async_send = ev.Async.import(string)

Import serialized send for multi-threading signalization (low-level API).

Return a function which will act as `async:send(loop)` when called.

Calling the function beyond the lifespan of either the exported loop or async
object will result in undefined behavior (crash/corruption). The same applies
with an ill-formed string.

### on_async(loop, idle, revents)

The loop is the event loop for which the idle object is
Expand Down Expand Up @@ -250,6 +264,11 @@ also `EV_TIMEOUT` C definition.
If this bit is set, the watcher was triggered by a signal. See
also `EV_SIGNAL` C definition.

### ev.ASYNC (constant)

If this bit is set, the watcher has been asynchronously notified. See also
`EV_ASYNC` C definition.

### ev.CHILD (constant)

If this bit is set, the watcher was triggered by a child signal.
Expand Down Expand Up @@ -448,6 +467,13 @@ Ensures that the watcher is neither active nor pending.

See also `ev_async_stop()` C function (document as `ev_TYPE_stop()`).

### async:send(loop)

Sends/signals/activates the given "ev_async" watcher, that is, feeds an
"EV_ASYNC" event on the watcher into the event loop, and instantly returns.

See also `ev_async_send()` C function.

## ev.Child object methods

### child:start(loop [, is_daemon])
Expand Down
88 changes: 88 additions & 0 deletions async_lua_ev.c
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,12 @@ static int luaopen_ev_async(lua_State *L) {
lua_pushcfunction(L, async_new);
lua_setfield(L, -2, "new");

lua_pushcfunction(L, async_export);
lua_setfield(L, -2, "export");

lua_pushcfunction(L, async_import);
lua_setfield(L, -2, "import");

return 1;
}

Expand All @@ -23,6 +29,7 @@ static int luaopen_ev_async(lua_State *L) {
static int create_async_mt(lua_State *L) {

static luaL_Reg fns[] = {
{ "send", async_send },
{ "stop", async_stop },
{ "start", async_start },
{ NULL, NULL }
Expand Down Expand Up @@ -50,6 +57,69 @@ static int async_new(lua_State* L) {
return 1;
}

struct async_serialized {
ev_async* async;
struct ev_loop* loop;
};

static int async_sz_send(lua_State *L) {
void *udata = lua_touserdata(L, lua_upvalueindex(1));
struct async_serialized* data = (struct async_serialized*)udata;

ev_async_send(data->loop, data->async);
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How can we ensure the lifetime of the data->async is still valid when this is called?


return 0;
}

/**
* Export serialized send for multi-threading signalization (low-level API).
*
* Return string.
*
* Usage:
* string = Async.export(async, loop)
*
* [+1, -0, ?]
*/
static int async_export(lua_State *L) {
ev_async* async = check_async(L, 1);
struct ev_loop* loop = *check_loop_and_init(L, 2);

struct async_serialized data;
data.async = async;
data.loop = loop;
lua_pushlstring(L, (const char*)&data, sizeof(struct async_serialized));

return 1;
}

/**
* Import serialized send for multi-threading signalization (low-level API).
*
* Return a function which will act as async:send(loop) when called.
*
* Calling the function beyond the lifespan of either the exported loop or
* async object will result in undefined behavior (crash/corruption). The same
* applies with an ill-formed string.
*
* Usage:
* async_send = Async.import(string)
*
* [+1, -0, ?]
*/
static int async_import(lua_State *L) {
size_t len;
const char* str = luaL_checklstring(L, 1, &len);
if ( len != sizeof(struct async_serialized) )
luaL_error(L, "invalid data size");

void* data = lua_newuserdata(L, sizeof(struct async_serialized));
memcpy(data, str, sizeof(struct async_serialized));
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How can we ensure that the memory pointed to by the fields of the async_serialized structure are still valid?

lua_pushcclosure(L, async_sz_send, 1);

return 1;
}

/**
* @see watcher_cb()
*
Expand All @@ -59,6 +129,24 @@ static void async_cb(struct ev_loop* loop, ev_async* async, int revents) {
watcher_cb(loop, async, revents);
}

/**
* Sends/signals/activates the given "ev_async" watcher, that is, feeds an
* "EV_ASYNC" event on the watcher into the event loop, and instantly returns.
*
* Usage:
* async:send(loop)
*
* [+0, -0, e]
*/
static int async_send(lua_State *L) {
ev_async* async = check_async(L, 1);
struct ev_loop* loop = *check_loop_and_init(L, 2);

ev_async_send(loop, async);

return 0;
}

/**
* Stops the async so it won't be called by the specified event loop.
*
Expand Down
3 changes: 3 additions & 0 deletions lua_ev.h
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,10 @@ static int io_getfd(lua_State *L);
static int luaopen_ev_async(lua_State *L);
static int create_async_mt(lua_State *L);
static int async_new(lua_State* L);
static int async_import(lua_State* L);
static int async_export(lua_State* L);
static void async_cb(struct ev_loop* loop, ev_async* async, int revents);
static int async_send(lua_State *L);
static int async_stop(lua_State *L);
static int async_start(lua_State *L);

Expand Down
40 changes: 40 additions & 0 deletions test/test_ev_async.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
print '1..20'

local src_dir, build_dir = ...
package.path = src_dir .. "?.lua;" .. package.path
package.cpath = build_dir .. "?.so;" .. package.cpath

local tap = require("tap")
local ev = require("ev")
local help = require("help")
local dump = require("dumper").dump
local ok = tap.ok

local noleaks = help.collect_and_assert_no_watchers
local loop = ev.Loop.default

function test_basic()
local async1 = ev.Async.new(
function(loop, async, revents)
ok(true, 'async callback')
ok(ev.ASYNC == revents, 'ev.ASYNC(' .. ev.ASYNC .. ') == revents (' .. revents .. ')')
end)
async1:start(loop)
async1:send(loop)
loop:loop()
end

function test_export()
local async1 = ev.Async.new(
function(loop, async, revents)
ok(true, 'async callback')
ok(ev.ASYNC == revents, 'ev.ASYNC(' .. ev.ASYNC .. ') == revents (' .. revents .. ')')
end)
local send = ev.Async.import(ev.Async.export(async1, loop))
async1:start(loop)
send()
loop:loop()
end

noleaks(test_basic, "test_basic")
noleaks(test_basic, "test_export")