Skip to content

Commit

Permalink
REST listener improvements
Browse files Browse the repository at this point in the history
- Added "Server:" response header
- Added "ETag:" response header to document response
- Honor "If-None-Match" request header getting document
- Added "total_rows" and "update_seq" to _all_docs response
- Suppress response body on HEAD request
- _changes feed improvements:
  - Flush output before waiting
  - No heartbeat if ?heartbeat=0
  - Longest possible timeout if ?timeout=0
  - Stop on socket error
  - Make logging verbose not info
  • Loading branch information
snej committed Aug 23, 2024
1 parent 5d99ec1 commit 301b4af
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 26 deletions.
60 changes: 38 additions & 22 deletions REST/RESTListener+Changes.cc
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@
// the file licenses/APL2.txt.
//

// Reference:
// https://docs.couchbase.com/sync-gateway/current/rest-api.html#/Database_Management/get_keyspace__changes
// https://docs.couchdb.org/en/stable/api/database/changes.html

#include "RESTListener.hh"
#include "Error.hh"
#include "Timer.hh"
Expand All @@ -28,8 +32,6 @@ using namespace fleece;
namespace litecore::REST {
using namespace net;

// API documentation: https://docs.couchdb.org/en/stable/api/database/changes.html

static constexpr uint64_t kDefaultHeartbeatMS = 60000;
static constexpr uint64_t kMinHeartbeatMS = 5000;
static constexpr uint64_t kMaxHeartbeatMS = 60000;
Expand Down Expand Up @@ -61,9 +63,10 @@ namespace litecore::REST {
//TODO: ?doc_ids, ?filter
}

~ChangesTask() { c4log(ListenerLog, kC4LogInfo, "Destructing ChangesTask %p", this); }
~ChangesTask() { c4log(ListenerLog, kC4LogVerbose, "ChangesTask %p deleted", this); }

void runImmediate() {
_rq.uncacheable();
if ( _feed == unknown ) {
_rq.respondWithStatus(HTTPStatus::BadRequest, "unsupported feed type");
_rq.finish();
Expand All @@ -89,6 +92,7 @@ namespace litecore::REST {
if ( _feed == continuous || (_feed == longpoll && _sent == 0 && _limit > 0) ) {
// In continuous mode, we always wait for more changes.
// In longpoll mode, we wait if there are no changes yet.
_rq.flush();
wait();
} else {
_rq.printf("], \"last_seq\":%llu}", uint64_t(_lastSequence));
Expand All @@ -112,7 +116,8 @@ namespace litecore::REST {
{
unique_lock lock(_mutex);
if ( !_rq.finished() ) {
if ( _feed != continuous ) _rq.printf("\n], \"last_seq\":%llu}", uint64_t(_lastSequence));
if ( _feed != continuous && !_rq.socketError() )
_rq.printf("\n], \"last_seq\":%llu}", uint64_t(_lastSequence));
_rq.finish();
}
}
Expand Down Expand Up @@ -145,25 +150,30 @@ namespace litecore::REST {
// Set a heartbeat timer that sends a newline periodically:
uint64_t intervalMS = kDefaultHeartbeatMS;
if ( hb != "true" ) intervalMS = _rq.uintQuery("heartbeat", kDefaultHeartbeatMS);
intervalMS = std::max(intervalMS, kMinHeartbeatMS);
intervalMS = std::min(intervalMS, kMaxHeartbeatMS);
chrono::milliseconds interval(intervalMS);
_heartbeatTimer.emplace([this, interval] { this->heartbeat(interval); });
_heartbeatTimer->fireAfter(interval);
} else {
if ( intervalMS > 0 ) {
intervalMS = std::max(intervalMS, kMinHeartbeatMS);
intervalMS = std::min(intervalMS, kMaxHeartbeatMS);
chrono::milliseconds interval(intervalMS);
_heartbeatTimer.emplace([this, interval] { this->heartbeat(interval); });
_heartbeatTimer->fireAfter(interval);
}
}
if ( !_heartbeatTimer ) {
// Or set a timeout timer to stop the task:
auto timeout = _rq.uintQuery("timeout", kDefaultTimeoutMS);
timeout = std::min(timeout, kMaxTimeoutMS);
if ( timeout == 0 ) timeout = kMaxTimeoutMS;
timeout = std::min(timeout, kMaxTimeoutMS);
_timeoutTimer.emplace([this] { this->timeout(); });
_timeoutTimer->fireAfter(chrono::milliseconds(timeout));
}
c4log(ListenerLog, kC4LogInfo, "ChangesTask %p waiting...", this);
c4log(ListenerLog, kC4LogVerbose, "ChangesTask %p waiting...", this);
registerTask();
}

// Called when the collection has changedd:
void observeChange() {
c4log(ListenerLog, kC4LogInfo, "ChangesTask %p got changes!", this);
c4log(ListenerLog, kC4LogVerbose, "ChangesTask %p got changes!", this);
bool doStop = false;
{
unique_lock lock(_mutex);
if ( _rq.finished() ) return;
Expand Down Expand Up @@ -196,22 +206,28 @@ namespace litecore::REST {
writeChange(info, body);
}
}
if ( _feed == continuous ) _rq.flush();
doStop = (_feed == longpoll && _sent > 0) || _rq.socketError() != kC4NoError;
}
if ( _feed == continuous ) _rq.flush();
else if ( _sent > 0 ) { stop(); }
if ( doStop ) { stop(); }
}

void heartbeat(chrono::milliseconds interval) {
c4log(ListenerLog, kC4LogInfo, "ChangesTask %p heartbeat", this);
unique_lock lock(_mutex);
if ( _rq.finished() ) return;
_rq.write("\n");
_rq.flush();
_heartbeatTimer->fireAfter(interval);
c4log(ListenerLog, kC4LogVerbose, "ChangesTask %p heartbeat", this);
bool doStop = false;
{
unique_lock lock(_mutex);
if ( _rq.finished() ) return;
_rq.write("\n");
_rq.flush();
_heartbeatTimer->fireAfter(interval);
doStop = _rq.socketError() != kC4NoError;
}
if ( doStop ) { stop(); }
}

void timeout() {
c4log(ListenerLog, kC4LogInfo, "ChangesTask %p timed out", this);
c4log(ListenerLog, kC4LogVerbose, "ChangesTask %p timed out", this);
{
unique_lock lock(_mutex);
if ( _rq.finished() ) return;
Expand Down
23 changes: 20 additions & 3 deletions REST/RESTListener+Handlers.cc
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
// the file licenses/APL2.txt.
//

// Reference: <https://docs.couchbase.com/sync-gateway/current/rest-api.html>

#include "RESTListener.hh"
#include "c4Private.h"
#include "c4Collection.hh"
Expand Down Expand Up @@ -42,6 +44,7 @@ namespace litecore::REST {
}

void RESTListener::handleActiveTasks(RequestResponse& rq) {
rq.uncacheable();
auto& json = rq.jsonEncoder();
json.writeArray([&] {
for ( auto& task : tasks() ) {
Expand All @@ -53,6 +56,8 @@ namespace litecore::REST {
#pragma mark - DATABASE HANDLERS:

void RESTListener::handleGetDatabase(RequestResponse& rq, C4Collection* coll) {
if ( rq.method() == HEAD ) return;

C4Database* db = coll->getDatabase();
optional<string> dbName = nameOfDatabase(db);
if ( !dbName ) return rq.respondWithStatus(HTTPStatus::NotFound);
Expand Down Expand Up @@ -165,6 +170,7 @@ namespace litecore::REST {

// Enumerate, building JSON:
JSONEncoder json;
int64_t rows = 0;
while ( e.next() ) {
if ( skip-- > 0 ) continue;
else if ( limit-- <= 0 )
Expand All @@ -181,8 +187,9 @@ namespace litecore::REST {
rq.write(json.finish());
rq.write("\n");
rq.flush(32768);
++rows;
}
rq.write("]}");
rq.printf("],\n\"total_rows\":%lld,\"update_seq\":%llu}", rows, coll->getLastSequence());
}

void RESTListener::handleGetDoc(RequestResponse& rq, C4Collection* coll) {
Expand All @@ -191,7 +198,7 @@ namespace litecore::REST {
Retained<C4Document> doc = coll->getDocument(docID, true, (revID.empty() ? kDocGetCurrentRev : kDocGetAll));
if ( doc ) {
if ( revID.empty() ) {
if ( doc->flags() & kDocDeleted ) doc = nullptr;
if ( doc->flags() & kDocDeleted ) doc = nullptr; // Don't return a tombstone unless rev given
else
revID = doc->revID().asString();
} else {
Expand All @@ -200,11 +207,20 @@ namespace litecore::REST {
}
if ( !doc ) return rq.respondWithStatus(HTTPStatus::NotFound);

// Use the revID as the HTTP ETag for conditional GETs:
string eTag = format("\"%s\"", revID.c_str());
if ( slice inm = rq.header("If-None-Match"); inm == eTag ) {
return rq.respondWithStatus(HTTPStatus::NotModified);
}
rq.setHeader("Etag", eTag);
rq.setHeader("Content-Type", "application/json");

if ( rq.method() == HEAD ) return;

// Get the revision
alloc_slice json = doc->bodyAsJSON(false);

// Splice the _id and _rev into the start of the JSON:
rq.setHeader("Content-Type", "application/json");
rq.write(R"({"_id":")");
rq.write(docID);
rq.write(R"(","_rev":")");
Expand All @@ -218,6 +234,7 @@ namespace litecore::REST {
} else {
rq.write("}");
}
rq.write("\n");
}

// Core code for create/update/delete operation on a single doc.
Expand Down
2 changes: 1 addition & 1 deletion REST/RESTListener.cc
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ namespace litecore::REST {
static int kTaskExpirationTime = 10;


string RESTListener::kServerName = "LiteCoreServ";
string RESTListener::kServerName = "CouchbaseLite";

string RESTListener::serverNameAndVersion() {
alloc_slice version(c4_getVersion());
Expand Down

0 comments on commit 301b4af

Please sign in to comment.