Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

7429/feature/add trending score to solr #9878

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 38 additions & 0 deletions conf/solr/conf/managed-schema.xml
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,44 @@
<field name="ratings_count_4" type="pint"/>
<field name="ratings_count_5" type="pint"/>


<!-- Trending related values-->
<field name = "trending_score_hourly_0" type="pint" indexed="false" stored ="false"/>
<field name = "trending_score_hourly_1" type="pint" indexed="false" stored ="false"/>
<field name = "trending_score_hourly_2" type="pint" indexed="false" stored ="false"/>
<field name = "trending_score_hourly_3" type="pint" indexed="false" stored ="false"/>
<field name = "trending_score_hourly_4" type="pint" indexed="false" stored ="false"/>
<field name = "trending_score_hourly_5" type="pint" indexed="false" stored ="false"/>
<field name = "trending_score_hourly_6" type="pint" indexed="false" stored ="false"/>
<field name = "trending_score_hourly_7" type="pint" indexed="false" stored ="false"/>
<field name = "trending_score_hourly_8" type="pint" indexed="false" stored ="false"/>
<field name = "trending_score_hourly_9" type="pint" indexed="false" stored ="false"/>
<field name = "trending_score_hourly_10" type="pint" indexed="false" stored ="false"/>
<field name = "trending_score_hourly_11" type="pint" indexed="false" stored ="false"/>
<field name = "trending_score_hourly_12" type="pint" indexed="false" stored ="false"/>
<field name = "trending_score_hourly_13" type="pint" indexed="false" stored ="false"/>
<field name = "trending_score_hourly_14" type="pint" indexed="false" stored ="false"/>
<field name = "trending_score_hourly_15" type="pint" indexed="false" stored ="false"/>
<field name = "trending_score_hourly_16" type="pint" indexed="false" stored ="false"/>
<field name = "trending_score_hourly_17" type="pint" indexed="false" stored ="false"/>
<field name = "trending_score_hourly_18" type="pint" indexed="false" stored ="false"/>
<field name = "trending_score_hourly_19" type="pint" indexed="false" stored ="false"/>
<field name = "trending_score_hourly_20" type="pint" indexed="false" stored ="false"/>
<field name = "trending_score_hourly_21" type="pint" indexed="false" stored ="false"/>
<field name = "trending_score_hourly_22" type="pint" indexed="false" stored ="false"/>
<field name = "trending_score_hourly_23" type="pint" indexed="false" stored ="false"/>
<field name = "trending_score_hourly_sum" type="pint" indexed = "false" stored = "false" />
<field name = "trending_score_daily_0" type="pint" indexed="false" stored ="false"/>
<field name = "trending_score_daily_1" type="pint" indexed="false" stored ="false"/>
<field name = "trending_score_daily_2" type="pint" indexed="false" stored ="false"/>
<field name = "trending_score_daily_3" type="pint" indexed="false" stored ="false"/>
<field name = "trending_score_daily_4" type="pint" indexed="false" stored ="false"/>
<field name = "trending_score_daily_5" type="pint" indexed="false" stored ="false"/>
<field name = "trending_score_daily_6" type="pint" indexed="false" stored ="false"/>
<field name = "trending_z_score" type="pfloat" indexed="false" stored ="false"/>



<!-- Reading Log -->
<field name="readinglog_count" type="pint"/>
<field name="want_to_read_count" type="pint"/>
Expand Down
34 changes: 33 additions & 1 deletion openlibrary/solr/solr_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,5 +90,37 @@ class SolrDocument(TypedDict):
lending_edition_s: Optional[str]
ia_collection_s: Optional[str]
ebook_count_i: Optional[int]

trending_score_hourly_0: Optional[int]
trending_score_hourly_1: Optional[int]
trending_score_hourly_2: Optional[int]
trending_score_hourly_3: Optional[int]
trending_score_hourly_4: Optional[int]
trending_score_hourly_5: Optional[int]
trending_score_hourly_6: Optional[int]
trending_score_hourly_7: Optional[int]
trending_score_hourly_8: Optional[int]
trending_score_hourly_9: Optional[int]
trending_score_hourly_10: Optional[int]
trending_score_hourly_11: Optional[int]
trending_score_hourly_12: Optional[int]
trending_score_hourly_13: Optional[int]
trending_score_hourly_14: Optional[int]
trending_score_hourly_15: Optional[int]
trending_score_hourly_16: Optional[int]
trending_score_hourly_17: Optional[int]
trending_score_hourly_18: Optional[int]
trending_score_hourly_19: Optional[int]
trending_score_hourly_20: Optional[int]
trending_score_hourly_21: Optional[int]
trending_score_hourly_22: Optional[int]
trending_score_hourly_23: Optional[int]
trending_score_hourly_sum: Optional[int]
trending_score_daily_0: Optional[int]
trending_score_daily_1: Optional[int]
trending_score_daily_2: Optional[int]
trending_score_daily_3: Optional[int]
trending_score_daily_4: Optional[int]
trending_score_daily_5: Optional[int]
trending_score_daily_6: Optional[int]
trending_z_score: Optional[float]
# fmt: on
4 changes: 4 additions & 0 deletions openlibrary/solr/update.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ def can_update_key(key: str) -> bool:
return any(updater.key_test(key) for updater in get_solr_updaters())


async def in_place_update():
pass


async def update_keys(
keys: list[str],
commit=True,
Expand Down
26 changes: 24 additions & 2 deletions openlibrary/solr/updater/work.py
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ def build(self) -> SolrDocument:
doc |= self.build_legacy_ia_fields()
doc |= self.build_ratings() or {}
doc |= self.build_reading_log() or {}

doc |= self.build_trending_scores() or {}
return cast(SolrDocument, doc)

@property
Expand Down Expand Up @@ -654,7 +654,11 @@ def build_identifiers(self) -> dict[str, list[str]]:
identifiers[k] += v
return dict(identifiers)

def build_subjects(self) -> dict:
@property
def trending_z_score(self) -> float:
return self._work.get("trending_z_score", 0)

def build_subjects(self) -> dict[str, int]:
doc: dict = {}
field_map = {
'subjects': 'subject',
Expand All @@ -673,6 +677,24 @@ def build_subjects(self) -> dict:
}
return doc

def build_trending_scores(self) -> dict:
doc: dict = {
f'trending_score_hourly_{index}': self._work.get(
f'trending_score_hourly_{index}', 0
)
for index in range(24)
}
doc |= {
"trending_score_hourly_sum": self._work.get("trending_score_hourly_sum", 0)
}
doc |= {
f'trending_score_daily_{index}': self._work.get(
f'trending_score_daily_{index}', 0
)
for index in range(7)
}
return doc


def get_edition_ddcs(ed: dict):
ddcs: list[str] = ed.get('dewey_decimal_class', [])
Expand Down
10 changes: 10 additions & 0 deletions openlibrary/utils/solr.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,16 @@ def get_many(
).json()
return [doc_wrapper(doc) for doc in resp['response']['docs']]

def update_in_place(
self,
request,
):
resp = requests.post(
f'{self.base_url}/update?update.partial.requireInPlace=true,commit=true',
json=request,
).json()
return resp

def select(
self,
query,
Expand Down
52 changes: 52 additions & 0 deletions scripts/calculate_trending_scores_daily.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import _init_path
import os
from openlibrary.config import load_config
from openlibrary.core import db
from openlibrary.plugins.worksearch.code import execute_solr_query
import datetime

from openlibrary.plugins.worksearch.search import get_solr


def fetch_works(current_day: int):
resp = execute_solr_query(
'/export',
{
"q": f'trending_score_hourly_sum:[1 TO *] OR trending_score_daily_{current_day}:[1 TO *]',
"fl": "key,trending_score_hourly_sum",
"sort": "trending_score_hourly_sum desc",
},
)
doc_data = {}
if resp:
data = resp.json()
try:
docs = data["response"]["docs"]
except KeyError:
raise KeyError
print(docs)
doc_data = {doc["key"]: doc.get("trending_score_hourly_sum", 0) for doc in docs}
return doc_data


def form_inplace_updates(work_id: str, current_day: int, new_value: int):
return {"key": work_id, f'trending_score_daily_{current_day}': {"set": new_value}}


if __name__ == '__main__':
from contextlib import redirect_stdout

ol_config = os.getenv("OL_CONFIG")
if ol_config:
with open(os.devnull, 'w') as devnull, redirect_stdout(devnull):
load_config(ol_config)
current_day = datetime.datetime.now().weekday()
work_data = fetch_works(current_day)
print(work_data)
request_body = [
form_inplace_updates(work_id, current_day, work_data[work_id])
for work_id in work_data
]
print(request_body)
resp = get_solr().update_in_place(request_body)
print(resp)
124 changes: 124 additions & 0 deletions scripts/calculate_trending_scores_hourly.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
import _init_path
import os
import datetime
from openlibrary.config import load_config
from openlibrary.core import db
from openlibrary.plugins.worksearch.search import get_solr
from openlibrary.plugins.worksearch.code import execute_solr_query
from math import sqrt

# This script handles hourly updating of each works' z-score. The 'trending_score_hourly_23' field is
# ignored, and the current count of bookshelves events in the last hour is the new trending_score_hourly[0]
# value, with each other value incrementing up by one.

# Trending_score_daily values are all fetched in order to calculate their mean and standard deviation.
# The formula for the new z-score is [ z = sum(trending_score_hourly_*) - mean(trending_score_daily_*) / (standard_deviation(trending_score_daily)] + 1).
# The incrementation of the denominator by 1 serves several purposes:
# - It prevents divide-by-zero errors on objects with 0s in all trending_score_daily_* values, which is important,
# as they are instantiated to that upon the first time solr reindexes them with this new information.
# - It serves to deemphasize such values, as we're not particularly interested in a shift from
# 0 events to 1 event.


def fetch_works(current_hour: int):
ol_db = db.get_db()
query = "select work_id, Count(updated)"
query += "from bookshelves_events group by bookshelves_events.updated, work_id "
query += "having updated >= localtimestamp - interval '1 hour'"
db_data = {
f'/works/OL{storage.work_id}W': storage.count
for storage in list(ol_db.query(query))
}
print(db_data)
query = f'trending_score_hourly_{current_hour}:[1 TO *] OR {" OR ".join(["key:\"" + key + "\"" for key in db_data])}'
print(query)
resp = execute_solr_query(
'/export',
{
"q": query,
"fl": ",".join(
[
"key",
"trending_score_hourly_sum",
f'trending_score_hourly_{current_hour}',
"trending_score_daily_*",
]
),
"sort": "trending_score_hourly_sum desc",
},
)
doc_data = {}
if resp:
data = resp.json()
try:
docs = data["response"]["docs"]

except KeyError:
raise KeyError
print(docs)
doc_data = {
doc["key"]: {"count": db_data.get(doc["key"], 0), "solr_doc": doc}
for doc in docs
}
return doc_data


def get_z_score(solr_doc: dict, count: int, current_hour: int):
arith_mean = sum([solr_doc[f'trending_score_daily_{i}'] for i in range(7)]) / 7
benbdeitch marked this conversation as resolved.
Show resolved Hide resolved
last_24_hours_value = (
solr_doc['trending_score_hourly_sum']
+ count
- solr_doc[f'trending_score_hourly_{current_hour}']
)
st_dev = sqrt(
sum(
[
pow(solr_doc[f'trending_score_daily_{i}'] - arith_mean, 2)
for i in range(7)
]
)
/ 7
)
return (last_24_hours_value - arith_mean) / (st_dev + 1.0)
benbdeitch marked this conversation as resolved.
Show resolved Hide resolved


def form_inplace_updates(work_id: str, count: int, solr_doc: dict, current_hour: int):
request_body = {
"key": work_id,
f'trending_score_hourly_{current_hour}': {"set": count},
"trending_score_hourly_sum": {
"set": solr_doc["trending_score_hourly_sum"]
- solr_doc[f'trending_score_hourly_{current_hour}']
+ count
},
"trending_z_score": {"set": get_z_score(solr_doc, count, current_hour)},
}

return request_body


if __name__ == '__main__':
from contextlib import redirect_stdout

ol_config = os.getenv("OL_CONFIG")
if ol_config:
with open(os.devnull, 'w') as devnull, redirect_stdout(devnull):
load_config(ol_config)

current_hour = datetime.datetime.now().hour
work_data = fetch_works(current_hour)
if work_data:
request_body = [
form_inplace_updates(
work_id,
work_data[work_id]["count"],
work_data[work_id]["solr_doc"],
current_hour,
)
for work_id in work_data
]
print(request_body)
resp = get_solr().update_in_place(request_body)
print(resp)
else:
print("failure")
Loading