From 3673c5e73399eed0ddc65fcbe430611dd9fee492 Mon Sep 17 00:00:00 2001 From: benbdeitch Date: Fri, 13 Sep 2024 18:11:19 -0400 Subject: [PATCH] First draft of new trending scores for solr. --- conf/solr/conf/managed-schema.xml | 38 +++++++ openlibrary/solr/solr_types.py | 63 ++++++----- openlibrary/solr/update.py | 4 + openlibrary/solr/updater/work.py | 3 + openlibrary/utils/solr.py | 10 ++ scripts/calculate_trending_scores_daily.py | 52 +++++++++ scripts/calculate_trending_scores_hourly.py | 118 +++++++++++++++++--- 7 files changed, 244 insertions(+), 44 deletions(-) create mode 100644 scripts/calculate_trending_scores_daily.py diff --git a/conf/solr/conf/managed-schema.xml b/conf/solr/conf/managed-schema.xml index a1c0b23daac0..11bbfad44c3c 100644 --- a/conf/solr/conf/managed-schema.xml +++ b/conf/solr/conf/managed-schema.xml @@ -200,6 +200,44 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/openlibrary/solr/solr_types.py b/openlibrary/solr/solr_types.py index d0de9d9d9028..3768cabf810f 100644 --- a/openlibrary/solr/solr_types.py +++ b/openlibrary/solr/solr_types.py @@ -90,36 +90,37 @@ class SolrDocument(TypedDict): lending_edition_s: Optional[str] ia_collection_s: Optional[str] ebook_count_i: Optional[int] - trending_hourly_0: Optional[int] - trending_hourly_1: Optional[int] - trending_hourly_2: Optional[int] - trending_hourly_3: Optional[int] - trending_hourly_4: Optional[int] - trending_hourly_5: Optional[int] - trending_hourly_6: Optional[int] - trending_hourly_7: Optional[int] - trending_hourly_8: Optional[int] - trending_hourly_9: Optional[int] - trending_hourly_10: Optional[int] - trending_hourly_11: Optional[int] - trending_hourly_12: Optional[int] - trending_hourly_13: Optional[int] - trending_hourly_14: Optional[int] - trending_hourly_15: Optional[int] - trending_hourly_16: Optional[int] - trending_hourly_17: Optional[int] - trending_hourly_18: Optional[int] - trending_hourly_19: Optional[int] - trending_hourly_20: Optional[int] - trending_hourly_21: Optional[int] - trending_hourly_22: Optional[int] - trending_hourly_23: Optional[int] - trending_daily_0: Optional[int] - trending_daily_1: Optional[int] - trending_daily_2: Optional[int] - trending_daily_3: Optional[int] - trending_daily_4: Optional[int] - trending_daily_5: Optional[int] - trending_daily_6: Optional[int] + trending_score_hourly_0: Optional[int] + trending_score_hourly_1: Optional[int] + trending_score_hourly_2: Optional[int] + trending_score_hourly_3: Optional[int] + trending_score_hourly_4: Optional[int] + trending_score_hourly_5: Optional[int] + trending_score_hourly_6: Optional[int] + trending_score_hourly_7: Optional[int] + trending_score_hourly_8: Optional[int] + trending_score_hourly_9: Optional[int] + trending_score_hourly_10: Optional[int] + trending_score_hourly_11: Optional[int] + trending_score_hourly_12: Optional[int] + trending_score_hourly_13: Optional[int] + trending_score_hourly_14: Optional[int] + trending_score_hourly_15: Optional[int] + trending_score_hourly_16: Optional[int] + trending_score_hourly_17: Optional[int] + trending_score_hourly_18: Optional[int] + trending_score_hourly_19: Optional[int] + trending_score_hourly_20: Optional[int] + trending_score_hourly_21: Optional[int] + trending_score_hourly_22: Optional[int] + trending_score_hourly_23: Optional[int] + trending_score_hourly_sum: Optional[int] + trending_score_daily_0: Optional[int] + trending_score_daily_1: Optional[int] + trending_score_daily_2: Optional[int] + trending_score_daily_3: Optional[int] + trending_score_daily_4: Optional[int] + trending_score_daily_5: Optional[int] + trending_score_daily_6: Optional[int] trending_z_score: Optional[float] # fmt: on diff --git a/openlibrary/solr/update.py b/openlibrary/solr/update.py index 47c16e07c8db..901fb9086210 100644 --- a/openlibrary/solr/update.py +++ b/openlibrary/solr/update.py @@ -53,6 +53,10 @@ def can_update_key(key: str) -> bool: return any(updater.key_test(key) for updater in get_solr_updaters()) +async def in_place_update(): + pass + + async def update_keys( keys: list[str], commit=True, diff --git a/openlibrary/solr/updater/work.py b/openlibrary/solr/updater/work.py index 719ea56185fe..2c64d26a1995 100644 --- a/openlibrary/solr/updater/work.py +++ b/openlibrary/solr/updater/work.py @@ -684,6 +684,9 @@ def build_trending_scores(self) -> dict: ) for index in range(24) } + doc |= { + "trending_score_hourly_sum": self._work.get("trending_score_hourly_sum", 0) + } doc |= { f'trending_score_daily_{index}': self._work.get( f'trending_score_daily_{index}', 0 diff --git a/openlibrary/utils/solr.py b/openlibrary/utils/solr.py index ca68c3dd0c4b..0aa56f369183 100644 --- a/openlibrary/utils/solr.py +++ b/openlibrary/utils/solr.py @@ -79,6 +79,16 @@ def get_many( ).json() return [doc_wrapper(doc) for doc in resp['response']['docs']] + def update_in_place( + self, + request, + ): + resp = requests.post( + f'{self.base_url}/update?update.partial.requireInPlace=true,commit=true', + json=request, + ).json() + return resp + def select( self, query, diff --git a/scripts/calculate_trending_scores_daily.py b/scripts/calculate_trending_scores_daily.py new file mode 100644 index 000000000000..5edeb3c9698f --- /dev/null +++ b/scripts/calculate_trending_scores_daily.py @@ -0,0 +1,52 @@ +import _init_path +import os +from openlibrary.config import load_config +from openlibrary.core import db +from openlibrary.plugins.worksearch.code import execute_solr_query +import datetime + +from openlibrary.plugins.worksearch.search import get_solr + + +def fetch_works(current_day: int): + resp = execute_solr_query( + '/export', + { + "q": f'trending_score_hourly_sum:[1 TO *] OR trending_score_daily_{current_day}:[1 TO *]', + "fl": "key,trending_score_hourly_sum", + "sort": "trending_score_hourly_sum desc", + }, + ) + doc_data = {} + if resp: + data = resp.json() + try: + docs = data["response"]["docs"] + except KeyError: + raise KeyError + print(docs) + doc_data = {doc["key"]: doc.get("trending_score_hourly_sum", 0) for doc in docs} + return doc_data + + +def form_inplace_updates(work_id: str, current_day: int, new_value: int): + return {"key": work_id, f'trending_score_daily_{current_day}': {"set": new_value}} + + +if __name__ == '__main__': + from contextlib import redirect_stdout + + ol_config = os.getenv("OL_CONFIG") + if ol_config: + with open(os.devnull, 'w') as devnull, redirect_stdout(devnull): + load_config(ol_config) + current_day = datetime.datetime.now().weekday() + work_data = fetch_works(current_day) + print(work_data) + request_body = [ + form_inplace_updates(work_id, current_day, work_data[work_id]) + for work_id in work_data + ] + print(request_body) + resp = get_solr().update_in_place(request_body) + print(resp) diff --git a/scripts/calculate_trending_scores_hourly.py b/scripts/calculate_trending_scores_hourly.py index d06314da96c6..4558c6c90995 100644 --- a/scripts/calculate_trending_scores_hourly.py +++ b/scripts/calculate_trending_scores_hourly.py @@ -1,17 +1,100 @@ import _init_path import os +import datetime from openlibrary.config import load_config from openlibrary.core import db -from openlibrary.plugins.worksearch.code import get_solr_works +from openlibrary.plugins.worksearch.search import get_solr +from openlibrary.plugins.worksearch.code import execute_solr_query +from math import sqrt +# This script handles hourly updating of each works' z-score. The 'trending_score_hourly_23' field is +# ignored, and the current count of bookshelves events in the last hour is the new trending_score_hourly[0] +# value, with each other value incrementing up by one. -def fetch_works(work_ids: list[int]): - fields = [f'trending_score_hourly_{index}' for index in range(23)] - fields.extend([f'trending_score_daily_{index}' for index in range(7)]) +# Trending_score_daily values are all fetched in order to calculate their mean and standard deviation. +# The formula for the new z-score is [ z = sum(trending_score_hourly_*) - mean(trending_score_daily_*) / (standard_deviation(trending_score_daily)] + 1). +# The incrementation of the denominator by 1 serves several purposes: +# - It prevents divide-by-zero errors on objects with 0s in all trending_score_daily_* values, which is important, +# as they are instantiated to that upon the first time solr reindexes them with this new information. +# - It serves to deemphasize such values, as we're not particularly interested in a shift from +# 0 events to 1 event. -def update_hourly_count_and_z_score(work_id: int, count: int): - pass +def fetch_works(current_hour: int): + ol_db = db.get_db() + query = "select work_id, Count(updated)" + query += "from bookshelves_events group by bookshelves_events.updated, work_id " + query += "having updated >= localtimestamp - interval '1 hour'" + db_data = { + f'/works/OL{storage.work_id}W': storage.count + for storage in list(ol_db.query(query)) + } + print(db_data) + query = f'trending_score_hourly_{current_hour}:[1 TO *] OR {" OR ".join(["key:\"" + key + "\"" for key in db_data])}' + print(query) + resp = execute_solr_query( + '/export', + { + "q": query, + "fl": ",".join( + [ + "key", + "trending_score_hourly_sum", + f'trending_score_hourly_{current_hour}', + "trending_score_daily_*", + ] + ), + "sort": "trending_score_hourly_sum desc", + }, + ) + doc_data = {} + if resp: + data = resp.json() + try: + docs = data["response"]["docs"] + + except KeyError: + raise KeyError + print(docs) + doc_data = { + doc["key"]: {"count": db_data.get(doc["key"], 0), "solr_doc": doc} + for doc in docs + } + return doc_data + + +def get_z_score(solr_doc: dict, count: int, current_hour: int): + arith_mean = sum([solr_doc[f'trending_score_daily_{i}'] for i in range(7)]) / 7 + last_24_hours_value = ( + solr_doc['trending_score_hourly_sum'] + + count + - solr_doc[f'trending_score_hourly_{current_hour}'] + ) + st_dev = sqrt( + sum( + [ + pow(solr_doc[f'trending_score_daily_{i}'] - arith_mean, 2) + for i in range(7) + ] + ) + / 7 + ) + return (last_24_hours_value - arith_mean) / (st_dev + 1.0) + + +def form_inplace_updates(work_id: str, count: int, solr_doc: dict, current_hour: int): + request_body = { + "key": work_id, + f'trending_score_hourly_{current_hour}': {"set": count}, + "trending_score_hourly_sum": { + "set": solr_doc["trending_score_hourly_sum"] + - solr_doc[f'trending_score_hourly_{current_hour}'] + + count + }, + "trending_z_score": {"set": get_z_score(solr_doc, count, current_hour)}, + } + + return request_body if __name__ == '__main__': @@ -21,12 +104,21 @@ def update_hourly_count_and_z_score(work_id: int, count: int): if ol_config: with open(os.devnull, 'w') as devnull, redirect_stdout(devnull): load_config(ol_config) - ol_db = db.get_db() - query = "select work_id, Count(updated)" - query += "from bookshelves_events group by bookshelves_events.updated, work_id" - query += "having updated >= localtimestamp - interval '1 hour'" - print(ol_db.__dir__()) - ids_needing_changes = list(ol_db.query(query)) - print(ids_needing_changes[0].work_id) + + current_hour = datetime.datetime.now().hour + work_data = fetch_works(current_hour) + if work_data: + request_body = [ + form_inplace_updates( + work_id, + work_data[work_id]["count"], + work_data[work_id]["solr_doc"], + current_hour, + ) + for work_id in work_data + ] + print(request_body) + resp = get_solr().update_in_place(request_body) + print(resp) else: print("failure")