Skip to content
This repository has been archived by the owner on Sep 1, 2022. It is now read-only.

Convert meet data to incremental additions #112

Merged
merged 4 commits into from
Mar 18, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 3 additions & 8 deletions google_classroom/endpoints/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import pandas as pd
from tenacity import stop_after_attempt, wait_exponential, Retrying
from sqlalchemy.schema import DropTable
from sqlalchemy.exc import NoSuchTableError, InvalidRequestError, DataError
from sqlalchemy.exc import NoSuchTableError, DataError
from timer import elapsed
import endpoints

Expand Down Expand Up @@ -47,7 +47,7 @@ def return_all_data(self):
return pd.read_sql_table(
self.table_name, con=self.sql.engine, schema=self.sql.schema
)
except InvalidRequestError as error:
except ValueError as error:
logging.debug(error)
return None

Expand Down Expand Up @@ -304,12 +304,7 @@ def differences_between_frames(self, df1, df2, left_on, right_on):
both: A dataframe containing data found in both df1 and df2.
"""
merged = pd.merge(
df1,
df2,
left_on=left_on,
right_on=right_on,
how="outer",
indicator=True,
df1, df2, left_on=left_on, right_on=right_on, how="outer", indicator=True,
)
left_only = merged[merged["_merge"] == "left_only"].reset_index(drop=True)
right_only = merged[merged["_merge"] == "right_only"].reset_index(drop=True)
Expand Down
3 changes: 1 addition & 2 deletions google_classroom/endpoints/course.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,7 @@ def __init__(self, service, sql, config):

def request_data(self, course_id=None, date=None, next_page_token=None):
return self.service.courses().list(
pageToken=next_page_token,
pageSize=self.config.PAGE_SIZE,
pageToken=next_page_token, pageSize=self.config.PAGE_SIZE,
)

def filter_data(self, dataframe):
Expand Down
24 changes: 24 additions & 0 deletions google_classroom/endpoints/meet.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
from endpoints.base import EndPoint
from datetime import timedelta
import logging


class Meet(EndPoint):
Expand All @@ -22,6 +24,7 @@ def __init__(self, service, sql, config):
]
self.request_key = "items"
self.batch_size = config.MEET_BATCH_SIZE
self.last_date = None

def request_data(self, course_id=None, date=None, next_page_token=None):
"""Request Google Meet events (currently only call_ended)"""
Expand All @@ -31,6 +34,27 @@ def request_data(self, course_id=None, date=None, next_page_token=None):
"eventName": "call_ended",
"pageToken": next_page_token,
}
# Meet data is added incrementally, because the data is very large. However,
# the data Google provides is not always fully up-to-date. As a result, the
# most reliable way to obtain incremental updated data is to drop the last
# 24 hours of data and then request data from Google starting at that point.
if next_page_token is None:
# Only set the last_date on the first request.
data = self.return_all_data()
if data is not None and data.item_time.count() > 0:
last_date = data.item_time.max()
table = self.sql.table(self.table_name)
last_date = last_date - timedelta(hours=24)
delete_query = table.delete().where(table.c.item_time > last_date)
self.sql.engine.execute(delete_query)
last_date = last_date.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
logging.debug(f"{self.classname()}: pulling data from {last_date}.")
self.last_date = last_date
else:
self.last_date = None
if self.last_date:
options["startTime"] = self.last_date

return self.service.activities().list(**options)

def preprocess_records(self, records):
Expand Down
2 changes: 1 addition & 1 deletion google_classroom/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ def pull_data(config, creds, sql):

# Get Meet data
if config.PULL_MEET:
Meet(admin_reports_service, sql, config).batch_pull_data()
Meet(admin_reports_service, sql, config).batch_pull_data(overwrite=False)


def sync_all_data(config, creds, sql):
Expand Down