diff --git a/REST/RESTListener+Changes.cc b/REST/RESTListener+Changes.cc index 70715279e..837c2a858 100644 --- a/REST/RESTListener+Changes.cc +++ b/REST/RESTListener+Changes.cc @@ -9,6 +9,10 @@ // the file licenses/APL2.txt. // +// Reference: +// https://docs.couchbase.com/sync-gateway/current/rest-api.html#/Database_Management/get_keyspace__changes +// https://docs.couchdb.org/en/stable/api/database/changes.html + #include "RESTListener.hh" #include "Error.hh" #include "Timer.hh" @@ -28,8 +32,6 @@ using namespace fleece; namespace litecore::REST { using namespace net; - // API documentation: https://docs.couchdb.org/en/stable/api/database/changes.html - static constexpr uint64_t kDefaultHeartbeatMS = 60000; static constexpr uint64_t kMinHeartbeatMS = 5000; static constexpr uint64_t kMaxHeartbeatMS = 60000; @@ -61,9 +63,10 @@ namespace litecore::REST { //TODO: ?doc_ids, ?filter } - ~ChangesTask() { c4log(ListenerLog, kC4LogInfo, "Destructing ChangesTask %p", this); } + ~ChangesTask() { c4log(ListenerLog, kC4LogVerbose, "ChangesTask %p deleted", this); } void runImmediate() { + _rq.uncacheable(); if ( _feed == unknown ) { _rq.respondWithStatus(HTTPStatus::BadRequest, "unsupported feed type"); _rq.finish(); @@ -89,6 +92,7 @@ namespace litecore::REST { if ( _feed == continuous || (_feed == longpoll && _sent == 0 && _limit > 0) ) { // In continuous mode, we always wait for more changes. // In longpoll mode, we wait if there are no changes yet. + _rq.flush(); wait(); } else { _rq.printf("], \"last_seq\":%llu}", uint64_t(_lastSequence)); @@ -112,7 +116,8 @@ namespace litecore::REST { { unique_lock lock(_mutex); if ( !_rq.finished() ) { - if ( _feed != continuous ) _rq.printf("\n], \"last_seq\":%llu}", uint64_t(_lastSequence)); + if ( _feed != continuous && !_rq.socketError() ) + _rq.printf("\n], \"last_seq\":%llu}", uint64_t(_lastSequence)); _rq.finish(); } } @@ -145,25 +150,30 @@ namespace litecore::REST { // Set a heartbeat timer that sends a newline periodically: uint64_t intervalMS = kDefaultHeartbeatMS; if ( hb != "true" ) intervalMS = _rq.uintQuery("heartbeat", kDefaultHeartbeatMS); - intervalMS = std::max(intervalMS, kMinHeartbeatMS); - intervalMS = std::min(intervalMS, kMaxHeartbeatMS); - chrono::milliseconds interval(intervalMS); - _heartbeatTimer.emplace([this, interval] { this->heartbeat(interval); }); - _heartbeatTimer->fireAfter(interval); - } else { + if ( intervalMS > 0 ) { + intervalMS = std::max(intervalMS, kMinHeartbeatMS); + intervalMS = std::min(intervalMS, kMaxHeartbeatMS); + chrono::milliseconds interval(intervalMS); + _heartbeatTimer.emplace([this, interval] { this->heartbeat(interval); }); + _heartbeatTimer->fireAfter(interval); + } + } + if ( !_heartbeatTimer ) { // Or set a timeout timer to stop the task: auto timeout = _rq.uintQuery("timeout", kDefaultTimeoutMS); - timeout = std::min(timeout, kMaxTimeoutMS); + if ( timeout == 0 ) timeout = kMaxTimeoutMS; + timeout = std::min(timeout, kMaxTimeoutMS); _timeoutTimer.emplace([this] { this->timeout(); }); _timeoutTimer->fireAfter(chrono::milliseconds(timeout)); } - c4log(ListenerLog, kC4LogInfo, "ChangesTask %p waiting...", this); + c4log(ListenerLog, kC4LogVerbose, "ChangesTask %p waiting...", this); registerTask(); } // Called when the collection has changedd: void observeChange() { - c4log(ListenerLog, kC4LogInfo, "ChangesTask %p got changes!", this); + c4log(ListenerLog, kC4LogVerbose, "ChangesTask %p got changes!", this); + bool doStop = false; { unique_lock lock(_mutex); if ( _rq.finished() ) return; @@ -196,22 +206,28 @@ namespace litecore::REST { writeChange(info, body); } } + if ( _feed == continuous ) _rq.flush(); + doStop = (_feed == longpoll && _sent > 0) || _rq.socketError() != kC4NoError; } - if ( _feed == continuous ) _rq.flush(); - else if ( _sent > 0 ) { stop(); } + if ( doStop ) { stop(); } } void heartbeat(chrono::milliseconds interval) { - c4log(ListenerLog, kC4LogInfo, "ChangesTask %p heartbeat", this); - unique_lock lock(_mutex); - if ( _rq.finished() ) return; - _rq.write("\n"); - _rq.flush(); - _heartbeatTimer->fireAfter(interval); + c4log(ListenerLog, kC4LogVerbose, "ChangesTask %p heartbeat", this); + bool doStop = false; + { + unique_lock lock(_mutex); + if ( _rq.finished() ) return; + _rq.write("\n"); + _rq.flush(); + _heartbeatTimer->fireAfter(interval); + doStop = _rq.socketError() != kC4NoError; + } + if ( doStop ) { stop(); } } void timeout() { - c4log(ListenerLog, kC4LogInfo, "ChangesTask %p timed out", this); + c4log(ListenerLog, kC4LogVerbose, "ChangesTask %p timed out", this); { unique_lock lock(_mutex); if ( _rq.finished() ) return; diff --git a/REST/RESTListener+Handlers.cc b/REST/RESTListener+Handlers.cc index f279c1ae4..80c93c083 100644 --- a/REST/RESTListener+Handlers.cc +++ b/REST/RESTListener+Handlers.cc @@ -10,6 +10,8 @@ // the file licenses/APL2.txt. // +// Reference: + #include "RESTListener.hh" #include "c4Private.h" #include "c4Collection.hh" @@ -42,6 +44,7 @@ namespace litecore::REST { } void RESTListener::handleActiveTasks(RequestResponse& rq) { + rq.uncacheable(); auto& json = rq.jsonEncoder(); json.writeArray([&] { for ( auto& task : tasks() ) { @@ -53,6 +56,8 @@ namespace litecore::REST { #pragma mark - DATABASE HANDLERS: void RESTListener::handleGetDatabase(RequestResponse& rq, C4Collection* coll) { + if ( rq.method() == HEAD ) return; + C4Database* db = coll->getDatabase(); optional dbName = nameOfDatabase(db); if ( !dbName ) return rq.respondWithStatus(HTTPStatus::NotFound); @@ -165,6 +170,7 @@ namespace litecore::REST { // Enumerate, building JSON: JSONEncoder json; + int64_t rows = 0; while ( e.next() ) { if ( skip-- > 0 ) continue; else if ( limit-- <= 0 ) @@ -181,8 +187,9 @@ namespace litecore::REST { rq.write(json.finish()); rq.write("\n"); rq.flush(32768); + ++rows; } - rq.write("]}"); + rq.printf("],\n\"total_rows\":%lld,\"update_seq\":%llu}", rows, coll->getLastSequence()); } void RESTListener::handleGetDoc(RequestResponse& rq, C4Collection* coll) { @@ -191,7 +198,7 @@ namespace litecore::REST { Retained doc = coll->getDocument(docID, true, (revID.empty() ? kDocGetCurrentRev : kDocGetAll)); if ( doc ) { if ( revID.empty() ) { - if ( doc->flags() & kDocDeleted ) doc = nullptr; + if ( doc->flags() & kDocDeleted ) doc = nullptr; // Don't return a tombstone unless rev given else revID = doc->revID().asString(); } else { @@ -200,11 +207,20 @@ namespace litecore::REST { } if ( !doc ) return rq.respondWithStatus(HTTPStatus::NotFound); + // Use the revID as the HTTP ETag for conditional GETs: + string eTag = format("\"%s\"", revID.c_str()); + if ( slice inm = rq.header("If-None-Match"); inm == eTag ) { + return rq.respondWithStatus(HTTPStatus::NotModified); + } + rq.setHeader("Etag", eTag); + rq.setHeader("Content-Type", "application/json"); + + if ( rq.method() == HEAD ) return; + // Get the revision alloc_slice json = doc->bodyAsJSON(false); // Splice the _id and _rev into the start of the JSON: - rq.setHeader("Content-Type", "application/json"); rq.write(R"({"_id":")"); rq.write(docID); rq.write(R"(","_rev":")"); @@ -218,6 +234,7 @@ namespace litecore::REST { } else { rq.write("}"); } + rq.write("\n"); } // Core code for create/update/delete operation on a single doc. diff --git a/REST/RESTListener.cc b/REST/RESTListener.cc index cd581d1b2..291fcc746 100644 --- a/REST/RESTListener.cc +++ b/REST/RESTListener.cc @@ -35,7 +35,7 @@ namespace litecore::REST { static int kTaskExpirationTime = 10; - string RESTListener::kServerName = "LiteCoreServ"; + string RESTListener::kServerName = "CouchbaseLite"; string RESTListener::serverNameAndVersion() { alloc_slice version(c4_getVersion());