Skip to content

Commit

Permalink
Merge pull request #751 from cloudflare/jspspike/analytics-engine
Browse files Browse the repository at this point in the history
Add AnalyticsEngine binding type to workerd
  • Loading branch information
jspspike committed Jul 24, 2023
2 parents 7a1cfdb + 178a432 commit 404d6bb
Show file tree
Hide file tree
Showing 6 changed files with 159 additions and 9 deletions.
28 changes: 28 additions & 0 deletions src/workerd/api/analytics-engine-test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
import * as assert from 'node:assert';

let written = false;
async function isWritten(timeout) {
const start = Date.now();
do {
if (written) return true;
await scheduler.wait(100);
} while ((Date.now() - start) < timeout);
throw new Error("Test never received request from analytics engine handler");
}

export default {
async fetch(ctrl, env, ctx) {
written = true
return new Response("");
},
async test(ctrl, env, ctx) {
env.aebinding.writeDataPoint({
'blobs': ["TestBlob"],
'doubles': [25],
'indexes': ["testindex"],
});

assert.equal(await isWritten(5000), true);
return new Response("");
},
}
37 changes: 37 additions & 0 deletions src/workerd/api/analytics-engine-test.wd-test
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
using Workerd = import "/workerd/workerd.capnp";

const analyticsWorker :Workerd.Worker = (
modules = [
(name = "worker", esModule =
`import * as assert from 'node:assert';
`export default {
` async fetch(request, env, ctx) {
` let val = await request.json();
` console.log(JSON.stringify(val));
` assert.deepStrictEqual(val['dataset'], [97,110,97,108,121,116,105,99,115]);
` assert.deepStrictEqual(val['double1'], 25);
` assert.deepStrictEqual(val['blob1'], [84,101,115,116,66,108,111,98]);
` await env.main.fetch("http://w/");
` return new Response('');
` },
`};
),
],
compatibilityFlags = ["experimental", "nodejs_compat"],
compatibilityDate = "2023-02-28",
bindings = [ ( name = "main", service = "main" ) ]
);

const mainWorker :Workerd.Worker = (
modules = [
(name = "worker", esModule = embed "analytics-engine-test.js"),
],
compatibilityFlags = ["experimental", "nodejs_compat"],
compatibilityDate = "2023-02-28",

bindings = [ ( name = "aebinding", analyticsEngine = "analytics") ]
);

const unitTests :Workerd.Config = (
services = [ (name = "main", worker = .mainWorker), (name = "analytics", worker = .analyticsWorker) ],
);
64 changes: 60 additions & 4 deletions src/workerd/server/server.c++
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@
#include <kj/compat/url.h>
#include <kj/encoding.h>
#include <kj/map.h>
#include <capnp/message.h>
#include <capnp/compat/json.h>
#include <workerd/api/analytics-engine.capnp.h>
#include <workerd/io/worker-interface.h>
#include <workerd/io/worker-entrypoint.h>
#include <workerd/io/compatibility-date.h>
Expand All @@ -19,6 +22,7 @@
#include <openssl/pem.h>
#include <workerd/io/actor-cache.h>
#include <workerd/io/actor-sqlite.h>
#include <workerd/util/http-util.h>
#include <workerd/api/actor-state.h>
#include <workerd/util/mimetype.h>
#include "workerd-api.h"
Expand Down Expand Up @@ -1566,7 +1570,38 @@ private:

kj::Promise<void> writeLogfwdr(uint channel,
kj::FunctionParam<void(capnp::AnyPointer::Builder)> buildMessage) override {
KJ_FAIL_REQUIRE("no logging channels");
auto& context = IoContext::current();

auto headers = kj::HttpHeaders(context.getHeaderTable());
auto client = context.getHttpClient(channel, true, nullptr, "writeLogfwdr"_kjc);

auto urlStr = kj::str("https://fake-host");

capnp::MallocMessageBuilder requestMessage;
auto requestBuilder = requestMessage.initRoot<capnp::AnyPointer>();

buildMessage(requestBuilder);
capnp::JsonCodec json;
auto requestJson = json.encode(requestBuilder.getAs<api::AnalyticsEngineEvent>());

co_await context.waitForOutputLocks();

auto innerReq = client->request(kj::HttpMethod::POST, urlStr, headers, requestJson.size());

struct RefcountedWrapper: public kj::Refcounted {
explicit RefcountedWrapper(kj::Own<kj::HttpClient> client): client(kj::mv(client)) {}
kj::Own<kj::HttpClient> client;
};
auto rcClient = kj::refcounted<RefcountedWrapper>(kj::mv(client));
auto request = attachToRequest(kj::mv(innerReq), kj::mv(rcClient));

co_await request.body->write(requestJson.begin(), requestJson.size())
.attach(kj::mv(requestJson), kj::mv(request.body));
auto response = co_await request.response;

KJ_REQUIRE(response.statusCode >= 200 && response.statusCode < 300, "writeLogfwdr request returned an error");
co_await response.body->readAllBytes().attach(kj::mv(response.body)).ignoreResult();
co_return;
}

kj::Own<ActorChannel> getGlobalActor(uint channel, const ActorIdFactory::ActorId& id,
Expand Down Expand Up @@ -1649,7 +1684,8 @@ static kj::Maybe<WorkerdApiIsolate::Global> createBinding(
Worker::ValidationErrorReporter& errorReporter,
kj::Vector<FutureSubrequestChannel>& subrequestChannels,
kj::Vector<FutureActorChannel>& actorChannels,
kj::HashMap<kj::String, kj::HashMap<kj::String, Server::ActorConfig>>& actorConfigs) {
kj::HashMap<kj::String, kj::HashMap<kj::String, Server::ActorConfig>>& actorConfigs,
bool experimental) {
// creates binding object or returns null and reports an error
using Global = WorkerdApiIsolate::Global;
kj::StringPtr bindingName = binding.getName();
Expand Down Expand Up @@ -1883,7 +1919,7 @@ static kj::Maybe<WorkerdApiIsolate::Global> createBinding(
kj::Vector<Global> innerGlobals;
for (const auto& innerBinding: wrapped.getInnerBindings()) {
KJ_IF_MAYBE(global, createBinding(workerName, conf, innerBinding,
errorReporter, subrequestChannels, actorChannels, actorConfigs)) {
errorReporter, subrequestChannels, actorChannels, actorConfigs, experimental)) {
innerGlobals.add(kj::mv(*global));
} else {
// we've already communicated the error
Expand All @@ -1908,6 +1944,25 @@ static kj::Maybe<WorkerdApiIsolate::Global> createBinding(
}
}

case config::Worker::Binding::ANALYTICS_ENGINE: {
if (!experimental) {
errorReporter.addError(kj::str(
"AnalyticsEngine bindings are an experimental feature which may change or go away in the future."
"You must run workerd with `--experimental` to use this feature."));
}

uint channel = (uint)subrequestChannels.size() + IoContext::SPECIAL_SUBREQUEST_CHANNEL_COUNT;
subrequestChannels.add(FutureSubrequestChannel {
binding.getAnalyticsEngine(),
kj::mv(errorContext)
});

return makeGlobal(Global::AnalyticsEngine{
.subrequestChannel = channel,
.dataset = kj::str(binding.getAnalyticsEngine().getName()),
.version = 0,
});
}
}
errorReporter.addError(kj::str(
errorContext, "has unrecognized type. Was the config compiled with a newer version of "
Expand Down Expand Up @@ -2031,7 +2086,8 @@ kj::Own<Server::Service> Server::makeWorker(kj::StringPtr name, config::Worker::
kj::Vector<Global> globals(confBindings.size());
for (auto binding: confBindings) {
KJ_IF_MAYBE(global, createBinding(name, conf, binding, errorReporter,
subrequestChannels, actorChannels, actorConfigs)) {
subrequestChannels, actorChannels, actorConfigs,
experimental)) {
globals.add(kj::mv(*global));
}
}
Expand Down
16 changes: 13 additions & 3 deletions src/workerd/server/workerd-api.c++
Original file line number Diff line number Diff line change
Expand Up @@ -487,7 +487,8 @@ private:
static v8::Local<v8::Value> createBindingValue(
JsgWorkerdIsolate::Lock& lock,
const WorkerdApiIsolate::Global& global,
CompatibilityFlags::Reader featureFlags) {
CompatibilityFlags::Reader featureFlags,
uint32_t ownerId) {
using Global = WorkerdApiIsolate::Global;
auto context = lock.v8Context();

Expand Down Expand Up @@ -561,6 +562,12 @@ static v8::Local<v8::Value> createBindingValue(
kj::heap<ActorIdFactoryImpl>(ns.uniqueKey)));
}

KJ_CASE_ONEOF(ae, Global::AnalyticsEngine) {
// Use subrequestChannel as logfwdrChannel
value = lock.wrap(context, jsg::alloc<api::AnalyticsEngine>(ae.subrequestChannel,
kj::str(ae.dataset), ae.version, ownerId));
}

KJ_CASE_ONEOF(text, kj::String) {
value = lock.wrap(context, kj::mv(text));
}
Expand All @@ -585,7 +592,7 @@ static v8::Local<v8::Value> createBindingValue(
for (const auto& innerBinding: wrapped.innerBindings) {
jsg::check(env->Set(context,
lock.wrapString(innerBinding.name),
createBindingValue(lock, innerBinding, featureFlags)));
createBindingValue(lock, innerBinding, featureFlags, ownerId)));
}

// obtain exported function to call
Expand Down Expand Up @@ -620,7 +627,7 @@ void WorkerdApiIsolate::compileGlobals(

// Don't use String's usual TypeHandler here because we want to intern the string.
auto name = jsg::v8StrIntern(lock.v8Isolate, global.name);
auto value = createBindingValue(lock, global, featureFlags);
auto value = createBindingValue(lock, global, featureFlags, ownerId);

KJ_ASSERT(!value.IsEmpty(), "global did not produce v8::Value");
bool setResult = jsg::check(target->Set(context, name, value));
Expand Down Expand Up @@ -666,6 +673,9 @@ WorkerdApiIsolate::Global WorkerdApiIsolate::Global::clone() const {
KJ_CASE_ONEOF(ns, Global::DurableActorNamespace) {
result.value = ns.clone();
}
KJ_CASE_ONEOF(ae, Global::AnalyticsEngine) {
result.value = ae.clone();
}
KJ_CASE_ONEOF(text, kj::String) {
result.value = kj::str(text);
}
Expand Down
15 changes: 14 additions & 1 deletion src/workerd/server/workerd-api.h
Original file line number Diff line number Diff line change
Expand Up @@ -141,9 +141,22 @@ class WorkerdApiIsolate final: public Worker::ApiIsolate {
};
}
};
struct AnalyticsEngine {
uint subrequestChannel;
kj::String dataset;
int64_t version;
AnalyticsEngine clone() const {
return AnalyticsEngine {
.subrequestChannel = subrequestChannel,
.dataset = kj::str(dataset),
.version = version
};
}
};
kj::String name;
kj::OneOf<Json, Fetcher, KvNamespace, R2Bucket, R2Admin, CryptoKey, EphemeralActorNamespace,
DurableActorNamespace, QueueBinding, kj::String, kj::Array<byte>, Wrapped> value;
DurableActorNamespace, QueueBinding, kj::String, kj::Array<byte>, Wrapped,
AnalyticsEngine> value;

Global clone() const;
};
Expand Down
8 changes: 7 additions & 1 deletion src/workerd/server/workerd.capnp
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,12 @@ struct Worker {
# `getenv()` with that name. If the environment variable isn't set, the binding value is
# `null`.

# TODO(someday): dispatch, analyticsEngine, other new features
analyticsEngine @17 :ServiceDesignator;
# A binding for Analytics Engine. Allows workers to store information through Analytics Engine Events.
# workerd will forward AnalyticsEngineEvents to designated service in the body of HTTP requests
# This binding is subject to change and requires the `--experimental` flag

# TODO(someday): dispatch, other new features
}

struct Type {
Expand All @@ -372,6 +377,7 @@ struct Worker {
r2Bucket @9 :Void;
r2Admin @10 :Void;
queue @11 :Void;
analyticsEngine @12 : Void;
}
}

Expand Down

0 comments on commit 404d6bb

Please sign in to comment.