-
Notifications
You must be signed in to change notification settings - Fork 1
/
helpers.py
233 lines (172 loc) · 6.38 KB
/
helpers.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
"""Helper functions for creating paths and input normalization.
Primarily used by the nozomi API functions for generating the appropriate paths to files, and
ensuring that queries are made in a particular format used by the website.
If this package grows more complex, the functionality can be divided in a more manner. Due to
the simplicity of the current API, there isn't really a point right now.
"""
import json
import re
import os
from typing import ForwardRef
from collections import defaultdict
from exceptions import InvalidTagFormat, InvalidUrlFormat
# Prevent circular dependency issues
MediaMetaData = ForwardRef("MediaMetaData")
# defaultdict is used to count tags in total across all posts of a specific group. The following 3 functions are used together.
tag_counts = defaultdict(int)
# loads the dictionary if it already exists
def load_dictionary(file_path):
try:
with open(file_path, "r", encoding="utf-8") as file:
dictionary = json.load(file)
except Exception as e:
print("I received an error loading the dictionary:", e)
finally:
os.remove(file_path)
dictionary = {}
print("deleted dictionary", file_path)
return dictionary
# saves the dictionary.
def save_dictionary(dictionary, file_path):
with open(file_path, "w", encoding="utf-8") as file:
for tag, count in dictionary.items():
file.write(f"{tag}: {count}\n")
# merges the existing dictionary with a list of tags from an additional load.
def merge_dictionaries(dictionary1, dictionary2):
merged_dictionary = dictionary1.copy()
for key, value in dictionary2.items():
if key in merged_dictionary:
merged_dictionary[key] += value
else:
merged_dictionary[key] = value
return merged_dictionary
def sanitize_tag(tag: str) -> str:
"""Remove and replace any invalid characters in the tag.
Args:
tag: The search tag.
Raises:
InvalidTagFormat: If the tag was not sanitized properly.
Returns:
A tag in a valid format.
"""
try:
sanitized_tag = tag.lower().strip()
sanitized_tag = re.sub("[/#%]", "", sanitized_tag)
_validate_tag_sanitized(sanitized_tag)
except InvalidTagFormat as tf:
raise tf
except Exception as ex:
raise ex
return sanitized_tag
def parse_post_id(url: str) -> int:
"""Parse the post ID.
Args:
url: The URL of the post ID.
Raises:
InvalidUrlFormat: If the URL cannot be parsed because it is not a valid format.
Returns:
The ID of the post.
"""
try:
post_id = re.search(r"post\/([\s\S]*?)\.html", url).group(1)
post_id = int(post_id)
except AttributeError:
raise InvalidUrlFormat("The provided URL %s could not be parsed.", url)
except Exception as ex:
raise ex
return post_id
def create_media_filepath(media: MediaMetaData) -> str: # type: ignore
"""Build the path to media on the site.
Args:
media: The media on a post.
Returns:
The URL of the a post's media.
"""
if media.is_video:
subdomain = "v"
url_type = media.type
elif media.type == "gif":
subdomain = "g"
url_type = "gif"
else:
subdomain = "w"
url_type = "webp"
path = _calculate_post_filepath(media.dataid)
url_fmt = "https://{subdomain}.nozomi.la/{hashed_path}.{url_type}"
url = url_fmt.format(subdomain=subdomain, hashed_path=path, url_type=url_type)
return url
def create_tag_filepath(sanitized_tag: str) -> str:
"""Build the path to a .nozomi file for a particular tag.
Every search tag/term has an associated .nozomi file stored in the database. Each file contains
references to data that is related to the tag. This function builds the path to that file.
Args:
sanitized_tag: The sanitized search tag.
Raises:
InvalidTagFormat: If the tag was not sanitized before creating a tag filepath.
Returns:
The URL of the search tag's associated .nozomi file.
"""
try:
_validate_tag_sanitized(sanitized_tag)
encoded_tag = _encode_tag(sanitized_tag)
except InvalidTagFormat:
raise InvalidTagFormat("Tag must be sanitized before creating a filepath.")
except Exception as ex:
raise ex
return f"https://j.nozomi.la/nozomi/{encoded_tag}.nozomi"
def create_post_filepath(post_id: int) -> str:
"""Build the path to a post's JSON file.
The rules for creating the filepath can be found in the site's javascript file. They appear to
be arbitrary decisions. The JSON file for the post contains a variety of useful data including
image data, tags, etc.
Args:
post_id: The ID of a post on the website.
Returns:
The URL of the post's associated JSON file.
"""
post_id = str(post_id)
path = _calculate_post_filepath(post_id)
return f"https://j.nozomi.la/post/{path}.json"
def _calculate_post_filepath(id: str) -> str:
"""Calculate the filepath for data on a post.
Args:
id: Hash of a media file or the post id.
Returns:
The URL path of a post's associated file.
"""
if len(id) < 3:
path = id
else:
path = re.sub("^.*(..)(.)$", r"\g<2>/\g<1>/" + id, id)
return path
def _validate_tag_sanitized(tag: str) -> None:
"""Validate a search tag is sanitized properly.
Args:
tag: The search tag.
Raises:
InvalidTagFormat: If the tag is an empty string or begins with an invalid character.
"""
if not tag:
raise InvalidTagFormat(f"The tag '{tag}' is invalid. Cannot be empty.")
if tag[0] == "-":
raise InvalidTagFormat(
f"The tag '{tag}' is invalid. Cannot begin with character '-'"
)
def _encode_tag(sanitized_tag: str) -> str:
"""Encode a sanitized tag using Nozomi's custom urlencoder.
Args:
sanitized_tag: The sanitized search tag.
Returns:
The encoded sanitized search tag.
"""
def convert_char_to_hex(c):
return f"%{format(ord(c.group(0)), 'x')}"
encoded_tag = re.sub("[;/?:@=&]", convert_char_to_hex, sanitized_tag)
return encoded_tag
def save_ids_to_file(ids, filename):
with open(filename, "w") as file:
for id in ids:
file.write(str(id) + "\n")
def remove_duplicates(ids1, ids2):
ids2 = set(ids2)
return list(ids2 - set(ids1))