Skip to content

Commit

Permalink
[api] Support pipeline startup argument when calling pipy.read()
Browse files Browse the repository at this point in the history
  • Loading branch information
pajama-coder committed Jun 26, 2024
1 parent a257a6f commit 4613856
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 11 deletions.
26 changes: 16 additions & 10 deletions src/api/pipy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,7 @@ void Pipy::listen(pjs::Context &ctx) {
pjs::Str *protocol = nullptr;
pjs::Object *options = nullptr;
pjs::Function *builder = nullptr;
PipelineLayoutWrapper *plw = nullptr;
PipelineLayoutWrapper *ptw = nullptr;

if (ctx.get(i, address) || ctx.get(i, port)) i++;
else {
Expand All @@ -395,9 +395,9 @@ void Pipy::listen(pjs::Context &ctx) {

if (ctx.get(i, protocol)) i++;

if (!ctx.get(i, builder) && !ctx.get(i, plw)) {
if (!ctx.get(i, builder) && !ctx.get(i, ptw)) {
if (!ctx.check(i++, options)) return;
if (!ctx.check(i, builder) && !ctx.check(i, plw)) return;
if (!ctx.get(i, builder) && !ctx.get(i, ptw)) return ctx.error_argument_type(i, "a function or a pipeline template");
}

auto proto = Port::Protocol::TCP;
Expand Down Expand Up @@ -442,8 +442,8 @@ void Pipy::listen(pjs::Context &ctx) {
}

PipelineLayout *pl = nullptr;
if (plw) {
pl = plw->get();
if (ptw) {
pl = ptw->get();
} else if (builder) {
pl = PipelineDesigner::make_pipeline_layout(ctx, builder);
if (!pl) return;
Expand Down Expand Up @@ -540,20 +540,22 @@ Pipy::FileReader::FileReader(Worker *worker, pjs::Str *pathname, PipelineLayout
{
}

auto Pipy::FileReader::start() -> pjs::Promise* {
auto Pipy::FileReader::start(const pjs::Value &arg) -> pjs::Promise* {
auto promise = pjs::Promise::make();
m_settler = pjs::Promise::Settler::make(promise);
m_start_arg = arg;
m_file->open_read([this](FileStream *fs) { on_open(fs); });
retain();
return promise;
}

void Pipy::FileReader::on_open(FileStream *fs) {
InputContext ic;
if (fs) {
m_pipeline = Pipeline::make(m_pt, m_worker->new_context());
m_pipeline->on_end(this);
m_pipeline->chain(EventTarget::input());
m_pipeline->start();
m_pipeline->start(1, &m_start_arg);
fs->chain(m_pipeline->input());
} else {
std::string msg = "cannot open file: " + m_pathname->str();
Expand Down Expand Up @@ -816,11 +818,15 @@ template<> void ClassDef<Pipy>::init() {
auto worker = instance ? static_cast<Worker*>(instance) : nullptr;
Str *pathname;
Function *builder = nullptr;
if (!ctx.arguments(2, &pathname, &builder)) return;
auto pt = PipelineDesigner::make_pipeline_layout(ctx, builder);
PipelineLayoutWrapper *ptw = nullptr;
Value start_arg;
if (!ctx.check(0, pathname)) return;
if (!ctx.get(1, builder) && !ctx.get(1, ptw)) return ctx.error_argument_type(1, "a function or a pipeline template");
ctx.get(2, start_arg);
auto pt = ptw ? ptw->get() : PipelineDesigner::make_pipeline_layout(ctx, builder);
if (!pt) return;
auto fr = new Pipy::FileReader(worker, pathname, pt);
ret.set(fr->start());
ret.set(fr->start(start_arg));
});

method("listen", [](Context &ctx, Object*, Value &ret) {
Expand Down
3 changes: 2 additions & 1 deletion src/api/pipy.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ class Pipy : public pjs::FunctionTemplate<Pipy> {
public:
FileReader(Worker *worker, pjs::Str *pathname, PipelineLayout *pt);

auto start() -> pjs::Promise*;
auto start(const pjs::Value &args) -> pjs::Promise*;

private:
pjs::Ref<Worker> m_worker;
Expand All @@ -80,6 +80,7 @@ class Pipy : public pjs::FunctionTemplate<Pipy> {
pjs::Ref<File> m_file;
pjs::Ref<pjs::Promise> m_promise;
pjs::Ref<pjs::Promise::Settler> m_settler;
pjs::Value m_start_arg;

void on_open(FileStream *fs);

Expand Down

0 comments on commit 4613856

Please sign in to comment.