diff --git a/src/workerd/api/analytics-engine-test.js b/src/workerd/api/analytics-engine-test.js new file mode 100644 index 00000000000..00007963593 --- /dev/null +++ b/src/workerd/api/analytics-engine-test.js @@ -0,0 +1,28 @@ +import * as assert from 'node:assert'; + +let written = false; +async function isWritten(timeout) { + const start = Date.now(); + do { + if (written) return true; + await scheduler.wait(100); + } while ((Date.now() - start) < timeout); + throw new Error("Test never received request from analytics engine handler"); +} + +export default { + async fetch(ctrl, env, ctx) { + written = true + return new Response(""); + }, + async test(ctrl, env, ctx) { + env.aebinding.writeDataPoint({ + 'blobs': ["TestBlob"], + 'doubles': [25], + 'indexes': ["testindex"], + }); + + assert.equal(await isWritten(5000), true); + return new Response(""); + }, +} diff --git a/src/workerd/api/analytics-engine-test.wd-test b/src/workerd/api/analytics-engine-test.wd-test new file mode 100644 index 00000000000..3b4f4587fc5 --- /dev/null +++ b/src/workerd/api/analytics-engine-test.wd-test @@ -0,0 +1,37 @@ +using Workerd = import "/workerd/workerd.capnp"; + +const analyticsWorker :Workerd.Worker = ( + modules = [ + (name = "worker", esModule = + `import * as assert from 'node:assert'; + `export default { + ` async fetch(request, env, ctx) { + ` let val = await request.json(); + ` console.log(JSON.stringify(val)); + ` assert.deepStrictEqual(val['dataset'], [97,110,97,108,121,116,105,99,115]); + ` assert.deepStrictEqual(val['double1'], 25); + ` assert.deepStrictEqual(val['blob1'], [84,101,115,116,66,108,111,98]); + ` await env.main.fetch("http://w/"); + ` return new Response(''); + ` }, + `}; + ), + ], + compatibilityFlags = ["experimental", "nodejs_compat"], + compatibilityDate = "2023-02-28", + bindings = [ ( name = "main", service = "main" ) ] +); + +const mainWorker :Workerd.Worker = ( + modules = [ + (name = "worker", esModule = embed "analytics-engine-test.js"), + ], + compatibilityFlags = ["experimental", "nodejs_compat"], + compatibilityDate = "2023-02-28", + + bindings = [ ( name = "aebinding", analyticsEngine = "analytics") ] +); + +const unitTests :Workerd.Config = ( + services = [ (name = "main", worker = .mainWorker), (name = "analytics", worker = .analyticsWorker) ], +); diff --git a/src/workerd/server/server.c++ b/src/workerd/server/server.c++ index 83cee9611a5..0cf5ac9df04 100644 --- a/src/workerd/server/server.c++ +++ b/src/workerd/server/server.c++ @@ -9,6 +9,9 @@ #include #include #include +#include +#include +#include #include #include #include @@ -19,6 +22,7 @@ #include #include #include +#include #include #include #include "workerd-api.h" @@ -1566,7 +1570,38 @@ private: kj::Promise writeLogfwdr(uint channel, kj::FunctionParam buildMessage) override { - KJ_FAIL_REQUIRE("no logging channels"); + auto& context = IoContext::current(); + + auto headers = kj::HttpHeaders(context.getHeaderTable()); + auto client = context.getHttpClient(channel, true, nullptr, "writeLogfwdr"_kjc); + + auto urlStr = kj::str("https://fake-host"); + + capnp::MallocMessageBuilder requestMessage; + auto requestBuilder = requestMessage.initRoot(); + + buildMessage(requestBuilder); + capnp::JsonCodec json; + auto requestJson = json.encode(requestBuilder.getAs()); + + co_await context.waitForOutputLocks(); + + auto innerReq = client->request(kj::HttpMethod::POST, urlStr, headers, requestJson.size()); + + struct RefcountedWrapper: public kj::Refcounted { + explicit RefcountedWrapper(kj::Own client): client(kj::mv(client)) {} + kj::Own client; + }; + auto rcClient = kj::refcounted(kj::mv(client)); + auto request = attachToRequest(kj::mv(innerReq), kj::mv(rcClient)); + + co_await request.body->write(requestJson.begin(), requestJson.size()) + .attach(kj::mv(requestJson), kj::mv(request.body)); + auto response = co_await request.response; + + KJ_REQUIRE(response.statusCode >= 200 && response.statusCode < 300, "writeLogfwdr request returned an error"); + co_await response.body->readAllBytes().attach(kj::mv(response.body)).ignoreResult(); + co_return; } kj::Own getGlobalActor(uint channel, const ActorIdFactory::ActorId& id, @@ -1649,7 +1684,8 @@ static kj::Maybe createBinding( Worker::ValidationErrorReporter& errorReporter, kj::Vector& subrequestChannels, kj::Vector& actorChannels, - kj::HashMap>& actorConfigs) { + kj::HashMap>& actorConfigs, + bool experimental) { // creates binding object or returns null and reports an error using Global = WorkerdApiIsolate::Global; kj::StringPtr bindingName = binding.getName(); @@ -1883,7 +1919,7 @@ static kj::Maybe createBinding( kj::Vector innerGlobals; for (const auto& innerBinding: wrapped.getInnerBindings()) { KJ_IF_MAYBE(global, createBinding(workerName, conf, innerBinding, - errorReporter, subrequestChannels, actorChannels, actorConfigs)) { + errorReporter, subrequestChannels, actorChannels, actorConfigs, experimental)) { innerGlobals.add(kj::mv(*global)); } else { // we've already communicated the error @@ -1908,6 +1944,25 @@ static kj::Maybe createBinding( } } + case config::Worker::Binding::ANALYTICS_ENGINE: { + if (!experimental) { + errorReporter.addError(kj::str( + "AnalyticsEngine bindings are an experimental feature which may change or go away in the future." + "You must run workerd with `--experimental` to use this feature.")); + } + + uint channel = (uint)subrequestChannels.size() + IoContext::SPECIAL_SUBREQUEST_CHANNEL_COUNT; + subrequestChannels.add(FutureSubrequestChannel { + binding.getAnalyticsEngine(), + kj::mv(errorContext) + }); + + return makeGlobal(Global::AnalyticsEngine{ + .subrequestChannel = channel, + .dataset = kj::str(binding.getAnalyticsEngine().getName()), + .version = 0, + }); + } } errorReporter.addError(kj::str( errorContext, "has unrecognized type. Was the config compiled with a newer version of " @@ -2031,7 +2086,8 @@ kj::Own Server::makeWorker(kj::StringPtr name, config::Worker:: kj::Vector globals(confBindings.size()); for (auto binding: confBindings) { KJ_IF_MAYBE(global, createBinding(name, conf, binding, errorReporter, - subrequestChannels, actorChannels, actorConfigs)) { + subrequestChannels, actorChannels, actorConfigs, + experimental)) { globals.add(kj::mv(*global)); } } diff --git a/src/workerd/server/workerd-api.c++ b/src/workerd/server/workerd-api.c++ index fd6068c95dc..f9b9388c712 100644 --- a/src/workerd/server/workerd-api.c++ +++ b/src/workerd/server/workerd-api.c++ @@ -487,7 +487,8 @@ private: static v8::Local createBindingValue( JsgWorkerdIsolate::Lock& lock, const WorkerdApiIsolate::Global& global, - CompatibilityFlags::Reader featureFlags) { + CompatibilityFlags::Reader featureFlags, + uint32_t ownerId) { using Global = WorkerdApiIsolate::Global; auto context = lock.v8Context(); @@ -561,6 +562,12 @@ static v8::Local createBindingValue( kj::heap(ns.uniqueKey))); } + KJ_CASE_ONEOF(ae, Global::AnalyticsEngine) { + // Use subrequestChannel as logfwdrChannel + value = lock.wrap(context, jsg::alloc(ae.subrequestChannel, + kj::str(ae.dataset), ae.version, ownerId)); + } + KJ_CASE_ONEOF(text, kj::String) { value = lock.wrap(context, kj::mv(text)); } @@ -585,7 +592,7 @@ static v8::Local createBindingValue( for (const auto& innerBinding: wrapped.innerBindings) { jsg::check(env->Set(context, lock.wrapString(innerBinding.name), - createBindingValue(lock, innerBinding, featureFlags))); + createBindingValue(lock, innerBinding, featureFlags, ownerId))); } // obtain exported function to call @@ -620,7 +627,7 @@ void WorkerdApiIsolate::compileGlobals( // Don't use String's usual TypeHandler here because we want to intern the string. auto name = jsg::v8StrIntern(lock.v8Isolate, global.name); - auto value = createBindingValue(lock, global, featureFlags); + auto value = createBindingValue(lock, global, featureFlags, ownerId); KJ_ASSERT(!value.IsEmpty(), "global did not produce v8::Value"); bool setResult = jsg::check(target->Set(context, name, value)); @@ -666,6 +673,9 @@ WorkerdApiIsolate::Global WorkerdApiIsolate::Global::clone() const { KJ_CASE_ONEOF(ns, Global::DurableActorNamespace) { result.value = ns.clone(); } + KJ_CASE_ONEOF(ae, Global::AnalyticsEngine) { + result.value = ae.clone(); + } KJ_CASE_ONEOF(text, kj::String) { result.value = kj::str(text); } diff --git a/src/workerd/server/workerd-api.h b/src/workerd/server/workerd-api.h index ff83653ac1d..360ff2ab0c0 100644 --- a/src/workerd/server/workerd-api.h +++ b/src/workerd/server/workerd-api.h @@ -141,9 +141,22 @@ class WorkerdApiIsolate final: public Worker::ApiIsolate { }; } }; + struct AnalyticsEngine { + uint subrequestChannel; + kj::String dataset; + int64_t version; + AnalyticsEngine clone() const { + return AnalyticsEngine { + .subrequestChannel = subrequestChannel, + .dataset = kj::str(dataset), + .version = version + }; + } + }; kj::String name; kj::OneOf, Wrapped> value; + DurableActorNamespace, QueueBinding, kj::String, kj::Array, Wrapped, + AnalyticsEngine> value; Global clone() const; }; diff --git a/src/workerd/server/workerd.capnp b/src/workerd/server/workerd.capnp index 2c16bb50aae..fa315fcd646 100644 --- a/src/workerd/server/workerd.capnp +++ b/src/workerd/server/workerd.capnp @@ -351,7 +351,12 @@ struct Worker { # `getenv()` with that name. If the environment variable isn't set, the binding value is # `null`. - # TODO(someday): dispatch, analyticsEngine, other new features + analyticsEngine @17 :ServiceDesignator; + # A binding for Analytics Engine. Allows workers to store information through Analytics Engine Events. + # workerd will forward AnalyticsEngineEvents to designated service in the body of HTTP requests + # This binding is subject to change and requires the `--experimental` flag + + # TODO(someday): dispatch, other new features } struct Type { @@ -372,6 +377,7 @@ struct Worker { r2Bucket @9 :Void; r2Admin @10 :Void; queue @11 :Void; + analyticsEngine @12 : Void; } }