Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Arxivce-1756] cit to gcp sync fixes #726

Merged
merged 10 commits into from
Sep 19, 2024
Merged
14 changes: 10 additions & 4 deletions script/sync_prod_to_gcp/submissions_to_gcp.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@
import subprocess
import sys
import typing
from datetime import timedelta, datetime
from datetime import timedelta, datetime, timezone
from urllib.parse import unquote
from time import gmtime, sleep
from pathlib import Path
Expand Down Expand Up @@ -692,6 +692,10 @@ def is_ignored(self) -> bool:
if content.startswith(needle):
return True

except UnicodeDecodeError:
logger.info("Unparsable source, but no worries. if it's more than auto-ignore, it should not be ignored.: %s ext %s",
self.xid.ids, str(self.src_ext), extra=self.log_extra)
return False
except Exception as _exc:
logger.warning("bad .gz: %s ext %s",
self.xid.ids, str(self.src_ext), extra=self.log_extra,
Expand Down Expand Up @@ -734,6 +738,8 @@ def submission_message_to_file_state(data: dict, log_extra: dict, ask_webnode: b
paper_id = data.get('paper_id')
version = data.get('version')
src_ext = data.get('src_ext')
if src_ext and len(src_ext) > 0 and src_ext[0] != ".":
src_ext = "." + src_ext
source_format = data.get('source_format', '')

logger.info("Processing %s.v%s:%s", paper_id, str(version), str(src_ext), extra=log_extra)
Expand Down Expand Up @@ -858,7 +864,7 @@ def trash_bucket_objects(gs_client, objects: typing.List[str], log_extra: dict):
blob = bucket.blob(obj)
logger.debug("%s is being deleted", obj, extra=log_extra)
blob.delete()
logger.warning("%s is deleted", obj, extra=log_extra)
logger.info("%s is deleted", obj, extra=log_extra)
return


Expand Down Expand Up @@ -915,7 +921,7 @@ def submission_callback(message: Message) -> None:
"publish_type": str(publish_type), "arxiv_id": str(paper_id), "version": str(version)
}

message_age: timedelta = datetime.utcnow() - message.publish_time
message_age: timedelta = datetime.utcnow().replace(tzinfo=timezone.utc) - message.publish_time
compilation_timeout = int(os.environ.get("TEX_COMPILATION_TIMEOUT_MINUTES", "30"))

try:
Expand Down Expand Up @@ -1034,7 +1040,7 @@ def sync_to_gcp(state: SubmissionFilesState, log_extra: dict) -> bool:
logger.debug("Skipping [%s]: %s -> %s", entry_type, local, remote, extra=log_extra)
continue

logger.debug("uploading [%s]: %s -> %s", entry_type, local, remote, extra=log_extra)
logger.info("uploading [%s]: %s -> %s", entry_type, local, remote, extra=log_extra)
local_path = Path(local)
if local_path.exists() and local_path.is_file():
upload(gs_client, local_path, remote, upload_logger=logger)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
------------------------------------------------------------------------------
\\
arXiv:2409.03427
From: Mauricio Bustamante <foo@example.com>
Date: Thu, 5 Sep 2024 11:15:22 GMT (5kb)

Title: The Giant Radio Array for Neutrino Detection (GRAND) Collaboration --
Contributions to the 10th International Workshop on Acoustic and Radio EeV
Neutrino Detection Activities (ARENA 2024)
Authors: Rafael Alves Batista, Aur\'elien Benoit-L\'evy, Teresa Bister, Martina
Bohacova, Mauricio Bustamante, Washington Carvalho, Yiren Chen, LingMei
Cheng, Simon Chiche, Jean-Marc Colley, Pablo Correa, Nicoleta Cucu Laurenciu,
Zigao Dai, Rogerio M. de Almeida, Beatriz de Errico, Sijbrand de Jong, Jo\~ao
R. T. de Mello Neto, Krijn D de Vries, Valentin Decoene, Peter B. Denton,
Bohao Duan, Kaikai Duan, Ralph Engel, William Erba, Yizhong Fan, Ars\`ene
Ferri\`ere, QuanBu Gou, Junhua Gu, Marion Guelfand, Jianhua Guo, Yiqing Guo,
Claire Gu\'epin, Lukas G\"ulzow, Andreas Haungs, Matej Havelka, Haoning He,
Eric Hivon, Hongbo Hu, Xiaoyuan Huang, Yan Huang, Tim Huege, Wen Jiang,
Ramesh Koirala, ChuiZheng Kong, Kumiko Kotera, Jelena K\"ohler, Bruno L.
Lago, Zhisen Lai, Sandra Le Coz, Fran\c{c}ois Legrand, Antonios Leisos, Rui
Li, Xingyu Li, YiFei Li, Cheng Liu, Ruoyu Liu, Wei Liu, Pengxiong Ma, Oscar
Macias, Fr\'ed\'eric Magnard, Alexandre Marcowith, Olivier Martineau-Huynh,
Thomas McKinley, Paul Minodier, Pragati Mitra, Miguel Mostaf\'a, Kohta
Murase, Valentin Niess, Stavros Nonis, Shoichi Ogio, Foteini Oikonomou,
Hongwei Pan, Konstantinos Papageorgiou, Tanguy Pierog, Lech Wiktor
Piotrowski, Simon Prunet, Xiangli Qian, Markus Roth, Takashi Sako, Harm
Schoorlemmer, D\'aniel Sz\'alas-Motesiczky, Szymon S{\l}awi\'nski, Xishui
Tian, Anne Timmermans, Charles Timmermans, Petr Tobiska, Apostolos
Tsirigotis, Mat\'ias Tueros, George Vittakis, Hanrui Wang, Jiale Wang, Shen
Wang, Xiangyu Wang, Xu Wang, Daming Wei, Feng Wei, Xiangping Wu, Xuefeng Wu,
Xin Xu, Xing Xu, Fufu Yang, Lili Yang, Xuan Yang, Qiang Yuan, Philippe Zarka,
Houdun Zeng, Chao Zhang, Jianli Zhang, Kewen Zhang, Pengfei Zhang, Qingchi
Zhang, Songbo Zhang, Yi Zhang, Hao Zhou (for the GRAND Collaboration),
Stephanie Wissel, Andrew Zeolla, Cosmin Deaconu, Valentin Decoene, Kaeli
Hughes, Zachary Martin, Katharine Mulrey, Austin Cummings (for the BEACON
Collaboration), Oliver Kr\"omer, Kathryn Plant, Frank G. Schroeder
Categories: astro-ph.IM astro-ph.HE hep-ex hep-ph
Comments: Note: To access the list of contributions, please follow the "HTML"
link that can be found on the arXiv page
License: http://arxiv.org/licenses/nonexclusive-distrib/1.0/
\\
This is an index of the contributions by the Giant Radio Array for Neutrino
Detection (GRAND) Collaboration to the 10th International Workshop on Acoustic
and Radio EeV Neutrino Detection Activities (ARENA 2024, University of Chicago,
June 11-14, 2024). The contributions include an overview of GRAND in its
present and future incarnations, methods of radio-detection that are being
developed for them, and ongoing joint work between the GRAND and BEACON
experiments.
\\
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
------------------------------------------------------------------------------
\\
arXiv:2409.10667
From: Test User <test@example.com>
Date: Mon, 16 Sep 2024 19:04:47 GMT (1425kb)

Title: Benchmarking Secure Sampling Protocols for Differential Privacy
Authors: Test User
Categories: cs.CR
Comments: This is the full version (18 pages) of the paper Benchmarking Secure
Sampling Protocols for Differential Privacy published at CCS'2024
License: http://arxiv.org/licenses/nonexclusive-distrib/1.0/
\\
Differential privacy (DP) is widely employed to provide privacy protection
for individuals by limiting information leakage from the aggregated data. Two
well-known models of DP are the central model and the local model. The former
requires a trustworthy server for data aggregation, while the latter requires
individuals to add noise, significantly decreasing the utility of aggregated
results. Recently, many studies have proposed to achieve DP with Secure
Multi-party Computation (MPC) in distributed settings, namely, the distributed
model, which has utility comparable to central model while, under specific
security assumptions, preventing parties from obtaining others' information.
One challenge of realizing DP in distributed model is efficiently sampling
noise with MPC. Although many secure sampling methods have been proposed, they
have different security assumptions and isolated theoretical analyses. There is
a lack of experimental evaluations to measure and compare their performances.
We fill this gap by benchmarking existing sampling protocols in MPC and
performing comprehensive measurements of their efficiency. First, we present a
taxonomy of the underlying techniques of these sampling protocols. Second, we
extend widely used distributed noise generation protocols to be resilient
against Byzantine attackers. Third, we implement discrete sampling protocols
and align their security settings for a fair comparison. We then conduct an
extensive evaluation to study their efficiency and utility.
\\
Binary file not shown.
76 changes: 63 additions & 13 deletions script/sync_prod_to_gcp/test/test_submissions_to_gcp.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
TIMEOUTS["PDF_TIMEOUT"] = 3
TIMEOUTS["HTML_TIMEOUT"] = 3


def trim_test_dir(fileset):
for idx in range(len(fileset)):
fileset[idx]['cit'] = fileset[idx]['cit'][len(test_dir):]
Expand Down Expand Up @@ -84,7 +85,8 @@ def do_GET(self):

# PosixPath('/home/ntai/arxiv/arxiv-browse/script/sync_prod_to_gcp/test/cache/ps_cache/arxiv/html/2308/2308.99990v1')
for suffix in [".html.gz", ".tar.gz"]:
if paper_id.has_version:
# 2409.03427 is under /ftp
if paper_id.has_version and paper_id.id != "2409.03427":
source_path = os.path.join(test_dir, "data", "orig", "arxiv", "papers",
paper_id.yymm,
f"{paper_id.idv}{suffix}")
Expand Down Expand Up @@ -154,6 +156,7 @@ def bucket_object_exists(obj: str) -> bool:

arxivce_1756_obsolete = "gs://arxiv-sync-test-01/ftp/arxiv/papers/1907/1907.07431.gz"


class TestSubmissionsToGCP(unittest.TestCase):

@classmethod
Expand Down Expand Up @@ -183,6 +186,7 @@ def setUp(self) -> None:
"gs://arxiv-sync-test-01/ps_cache/arxiv/html/2308/2308.99990v1/2308.99990v1.html",
"gs://arxiv-sync-test-01/orig/arxiv/papers/1907/1907.07431v2.abs",
"gs://arxiv-sync-test-01/orig/arxiv/papers/1907/1907.07431v2.gz",
"gs://arxiv-sync-test-01/ps_cache/arxiv/pdf/2409/2409.10667v1.pdf"
]
subprocess.call(rm_items + droplets,
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
Expand All @@ -204,13 +208,18 @@ def setUp(self) -> None:
if os.path.exists(html_path):
shutil.rmtree(html_path)

html_path_2409_03427v1 = os.path.join(
sync_published_to_gcp.PS_CACHE_PREFIX, "arxiv", "html", "2409", "2409.03427v1")
if os.path.exists(html_path_2409_03427v1):
shutil.rmtree(html_path_2409_03427v1)

# test_arxivce_1756
# Thess are the v2 abs, and .gz as being replaced, and copied under /ftp
subprocess.call(["gsutil", "cp", "test/data/orig/arxiv/papers/1907/1907.07431v2.abs",
"gs://arxiv-sync-test-01/ftp/arxiv/papers/1907/1907.07431.abs"],
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
subprocess.call(["gsutil", "cp", "test/data/ftp/arxiv/papers/1907/1907.07431.gz",
"gs://arxiv-sync-test-01/ftp/arxiv/papers/1907/1907.07431.gz"],
"gs://arxiv-sync-test-01/ftp/arxiv/papers/1907/1907.07431.gz"],
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
pass

Expand Down Expand Up @@ -273,6 +282,30 @@ def test_sync_op(self):
self.assertEqual("141", get_file_size(
f"gs://arxiv-sync-test-01/ftp/arxiv/papers/2308/{paper_id}.tar.gz"))

def test_html_submission_2409_03427(self):
file_state = submission_message_to_file_state(
{"type": "new", "paper_id": "2409.03427", "version": 1, "src_ext": ".html.gz"}, {},
ask_webnode=True)
expected = trim_test_dir(file_state.get_expected_files())
self.assertEqual([
{'cit': '/data/ftp/arxiv/papers/2409/2409.03427.abs',
'gcp': 'ftp/arxiv/papers/2409/2409.03427.abs',
'status': 'current',
'type': 'abstract'},
{'cit': '/data/ftp/arxiv/papers/2409/2409.03427.html.gz',
'gcp': 'ftp/arxiv/papers/2409/2409.03427.html.gz',
'status': 'current',
'type': 'submission'},
{'cit': '/cache/ps_cache/arxiv/html/2409/2409.03427v1',
'gcp': 'ps_cache/arxiv/html/2409/2409.03427v1',
'status': 'current',
'type': 'html-cache'},
{'cit': '/cache/ps_cache/arxiv/html/2409/2409.03427v1/2409.03427.html',
'gcp': 'ps_cache/arxiv/html/2409/2409.03427v1/2409.03427.html',
'status': 'current',
'type': 'html-files'}
], expected)

def test_ask_pdf(self):
paper_id = "2308.16190"
data = {
Expand Down Expand Up @@ -516,10 +549,10 @@ def test_arxivce_1756(self):
'status': 'obsolete',
'type': 'abstract',
'version': 2},
{'cit': '/data/orig/arxiv/papers/1907/1907.07431v2.gz',
'gcp': 'orig/arxiv/papers/1907/1907.07431v2.gz',
{'cit': '/data/orig/arxiv/papers/1907/1907.07431v2.gz',
'gcp': 'orig/arxiv/papers/1907/1907.07431v2.gz',
'obsoleted': 'ftp/arxiv/papers/1907/1907.07431.gz',
'original': 'orig/arxiv/papers/1907/1907.07431v2.gz',
'original': 'orig/arxiv/papers/1907/1907.07431v2.gz',
'status': 'obsolete',
'type': 'submission',
'version': 2}],
Expand All @@ -544,6 +577,25 @@ def test_arxivce_1756(self):
# because the obsolete file is deleted
self.assertFalse(bucket_object_exists("gs://arxiv-sync-test-01/ftp/arxiv/papers/1907/1907.07431.gz"))

def test_new_pdf(self):
paper_id = "2409.10667"
test_data = {"type": "new", "paper_id": paper_id, "version": "1", "src_ext": ".pdf"}
file_state = submission_message_to_file_state(test_data, {}, ask_webnode=True)
expected = trim_test_dir(file_state.get_expected_files())
self.assertEqual([
{'cit': f'/data/ftp/arxiv/papers/2409/{paper_id}.abs',
'gcp': f'ftp/arxiv/papers/2409/{paper_id}.abs',
'status': 'current',
'type': 'abstract'},
{'cit': f'/data/ftp/arxiv/papers/2409/{paper_id}.pdf',
'gcp': f'ftp/arxiv/papers/2409/{paper_id}.pdf',
'status': 'current',
'type': 'submission'}
], expected)
# This should not be created
self.assertFalse(bucket_object_exists(
f"gs://arxiv-sync-test-01/ps_cache/arxiv/pdf/2409/{paper_id}v1/{paper_id}v1.pdf"))


class TestPayloadToMeta(unittest.TestCase):

Expand All @@ -564,8 +616,7 @@ def test_new(self):
'gcp': 'ps_cache/arxiv/pdf/2308/2308.99991v1.pdf',
'status': 'current',
'type': 'pdf-cache'}
], expected)

], expected)

def test_wdr(self):
# withdrawal -
Expand Down Expand Up @@ -593,7 +644,7 @@ def test_wdr(self):
'status': 'obsolete',
'type': 'submission',
'version': 1}
], expected)
], expected)

def test_jref_1(self):
test_data = {"type": "jref", "paper_id": "2308.99994", "version": "2", "src_ext": ".pdf"}
Expand Down Expand Up @@ -627,7 +678,7 @@ def test_cross(self):
'gcp': 'ps_cache/arxiv/html/2308/2308.99995v2',
'status': 'current',
'type': 'html-cache'}
], expected)
], expected)

def test_jref_2(self):
test_data = {"type": "jref", "paper_id": "2308.99996", "version": "2", "src_ext": ".tar.gz"}
Expand All @@ -646,8 +697,7 @@ def test_jref_2(self):
'gcp': 'ps_cache/arxiv/pdf/2308/2308.99996v2.pdf',
'status': 'current',
'type': 'pdf-cache'}
], expected)

], expected)

def test_submission_message_to_payloads(self):
test_data = {"type": "rep", "paper_id": "physics/0106051", "version": "3", "src_ext": ".gz"}
Expand Down Expand Up @@ -681,7 +731,7 @@ def test_submission_message_to_payloads(self):
'status': 'obsolete',
'type': 'submission',
'version': 2}
], payloads)
], payloads)

test_data = {"type": "rep", "paper_id": "physics/0106051", "version": 3, "src_ext": ".gz"}
file_state = submission_message_to_file_state(test_data, {}, ask_webnode=False)
Expand Down Expand Up @@ -715,7 +765,7 @@ def test_submission_message_to_payloads(self):
'status': 'obsolete',
'type': 'submission',
'version': 2}
],
],
payloads)

def test_arxivce_1756(self):
Expand Down
6 changes: 4 additions & 2 deletions script/sync_prod_to_gcp/webnode_pdf_request.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import subprocess
import threading
import typing
from datetime import datetime, timedelta
from datetime import datetime, timedelta, timezone
from pathlib import Path
from time import gmtime, sleep

Expand Down Expand Up @@ -112,6 +112,8 @@ def ping_callback(message: Message) -> None:
version = data.get('version')
arxiv_id_str = f'{paper_id}v{version}' if version else paper_id
src_ext: typing.Union[str, None] = data.get('src_ext')
if src_ext and len(src_ext) > 0 and src_ext[0] != ".":
src_ext = "." + src_ext
log_extra["arxiv_id"] = arxiv_id_str
log_extra["src_ext"] = str(src_ext)

Expand All @@ -128,7 +130,7 @@ def ping_callback(message: Message) -> None:
message.ack()
return

message_age: timedelta = datetime.utcnow() - message.publish_time
message_age: timedelta = datetime.utcnow().replace(tzinfo=timezone.utc) - message.publish_time
compilation_timeout = int(os.environ.get("TEX_COMPILATION_TIMEOUT_MINUTES", "30"))
if message_age > timedelta(minutes=compilation_timeout):
help_needed = os.environ.get("TEX_COMPILATION_RECIPIENT", "help@arxiv.org")
Expand Down
Loading