Skip to content

Commit

Permalink
Set update interval (1-120 days) manually for NVD API (#300)
Browse files Browse the repository at this point in the history
  • Loading branch information
oh2fih committed Jul 3, 2024
1 parent cb638fe commit c85a7fa
Show file tree
Hide file tree
Showing 3 changed files with 103 additions and 76 deletions.
61 changes: 27 additions & 34 deletions CveXplore/core/database_maintenance/main_updater.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,11 @@ def reset_download_sources_to_default(self):

return True

def update(self, update_source: str | list = None):
def update(self, update_source: str | list = None, manual_days: int = 0):
"""
Method used for updating the database
"""
self.logger.info(f"Starting Database update....")
self.logger.info(f"Starting Database update...")
start_time = time.time()

if not self.do_initialize:
Expand All @@ -80,42 +80,35 @@ def update(self, update_source: str | list = None):
)
self.database_migrator.db_upgrade()

if update_source is not None:
if not isinstance(update_source, str | list):
if update_source is None:
# update all sources
update_source = [source["name"] for source in self.sources]
elif isinstance(update_source, str):
# update a single source
update_source = [update_source]
else:
# update list of sources
if not isinstance(update_source, list):
raise ValueError("Wrong 'update_source' parameter type received!")

try:
if update_source is None:
for source in self.sources:
up = source["updater"]()
up.update()

elif isinstance(update_source, list):
for source in update_source:
try:
update_this_source = [
x for x in self.sources if x["name"] == source
][0]
up = update_this_source["updater"]()
up.update()
except IndexError:
raise UpdateSourceNotFound(
f"Provided source: {source} could not be found...."
for source in update_source:
try:
update_this_source = [x for x in self.sources if x["name"] == source][0]
up = update_this_source["updater"]()
if manual_days > 0:
if update_this_source["name"] in ("cpe", "cve"):
up.update(manual_days=manual_days)
else:
self.logger.warning(
f"Update interval in days not supported by source {source}; ignoring"
)
else:
# single string then....
try:
update_this_source = [
x for x in self.sources if x["name"] == update_source
][0]
up = update_this_source["updater"]()
up.update()
else:
up.update()
except IndexError:
raise UpdateSourceNotFound(
f"Provided source: {update_source} could not be found...."
)
except UpdateSourceNotFound:
raise
except IndexError:
raise UpdateSourceNotFound(
f"Provided source: {source} could not be found...."
)

self.database_indexer.create_indexes()

Expand Down
110 changes: 68 additions & 42 deletions CveXplore/core/database_maintenance/sources_process.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ def process_the_item(self, item: dict = None):

return cpe

def process_downloads(self, sites: list | None = None):
def process_downloads(self, sites: list | None = None, manual_days: int = 0):
"""
Method to download and process files
"""
Expand Down Expand Up @@ -156,29 +156,40 @@ def process_downloads(self, sites: list | None = None):
f"Retrieval of api data on url: {data_list.args[0]} failed...."
)
else:
last_mod_start_date = self.database[self.feed_type.lower()].find_one(
{}, {"lastModified": 1}, sort=[("lastModified", -1)]
)
# Get datetime from runtime
last_mod_end_date = datetime.datetime.now()

if last_mod_start_date is not None:
if "lastModified" in last_mod_start_date:
last_mod_start_date = last_mod_start_date[
"lastModified"
] + datetime.timedelta(
0, 1
) # add one second to prevent false results...
else:
raise KeyError(
"Missing field 'lastModified' from database query..."
)
else:
# Use configured day interval or detect from the latest entry in the database
if manual_days > 120:
self.logger.warning(
"No records found in the mongodb cpe collection.."
f"Update interval over 120 days not supported by the NVD API; ignoring"
)
return

# Get datetime from runtime
last_mod_end_date = datetime.datetime.now()
if manual_days > 0 and manual_days <= 120:
last_mod_start_date = last_mod_end_date - datetime.timedelta(
days=manual_days
)
else:
last_mod_start_date = self.database[
self.feed_type.lower()
].find_one({}, {"lastModified": 1}, sort=[("lastModified", -1)])

if last_mod_start_date is not None:
if "lastModified" in last_mod_start_date:
last_mod_start_date = last_mod_start_date[
"lastModified"
] + datetime.timedelta(
0, 1
) # add one second to prevent false results...
else:
raise KeyError(
"Missing field 'lastModified' from database query..."
)
else:
self.logger.warning(
"No records found in the mongodb cpe collection.."
)
return
self.logger.info(f"Retrieving CPEs starting from {last_mod_start_date}")

try:
total_results = self.api_handler.get_count(
Expand Down Expand Up @@ -231,10 +242,10 @@ def process_downloads(self, sites: list | None = None):
f"Duration: {datetime.timedelta(seconds=time.time() - start_time)}"
)

def update(self, **kwargs):
def update(self, manual_days: int = 0):
self.logger.info("CPE database update started")

self.process_downloads()
self.process_downloads(manual_days=manual_days)

# if collection is non-existent; assume it's not an update
if self.feed_type.lower() not in self.getTableNames():
Expand Down Expand Up @@ -644,7 +655,7 @@ def process_the_item(self, item: dict = None):

return cve

def process_downloads(self, sites: list = None):
def process_downloads(self, sites: list = None, manual_days: int = 0):
"""
Method to download and process files
"""
Expand Down Expand Up @@ -701,25 +712,40 @@ def process_downloads(self, sites: list = None):
f"Retrieval of api data on url: {data_list.args[0]} failed...."
)
else:
last_mod_start_date = self.database[self.feed_type.lower()].find_one(
{}, {"lastModified": 1}, sort=[("lastModified", -1)]
)
# Get datetime from runtime
last_mod_end_date = datetime.datetime.now()

if last_mod_start_date is not None:
if "lastModified" in last_mod_start_date:
last_mod_start_date = last_mod_start_date["lastModified"]
else:
raise KeyError(
"Missing field 'lastModified' from database query..."
)
else:
# Use configured day interval or detect from the latest entry in the database
if manual_days > 120:
self.logger.warning(
"No records found in the mongodb cves collection.."
f"Update interval over 120 days not supported by the NVD API; ignoring"
)
return

# Get datetime from runtime
last_mod_end_date = datetime.datetime.now()
if manual_days > 0 and manual_days <= 120:
last_mod_start_date = last_mod_end_date - datetime.timedelta(
days=manual_days
)
else:
last_mod_start_date = self.database[
self.feed_type.lower()
].find_one({}, {"lastModified": 1}, sort=[("lastModified", -1)])

if last_mod_start_date is not None:
if "lastModified" in last_mod_start_date:
last_mod_start_date = last_mod_start_date[
"lastModified"
] + datetime.timedelta(
0, 1
) # add one second to prevent false results...
else:
raise KeyError(
"Missing field 'lastModified' from database query..."
)
else:
self.logger.warning(
"No records found in the mongodb cpe collection.."
)
return
self.logger.info(f"Retrieving CVEs starting from {last_mod_start_date}")

try:
total_results = self.api_handler.get_count(
Expand Down Expand Up @@ -772,10 +798,10 @@ def process_downloads(self, sites: list = None):
f"Duration: {datetime.timedelta(seconds=time.time() - start_time)}"
)

def update(self):
def update(self, manual_days: int = 0):
self.logger.info("CVE database update started")

self.process_downloads()
self.process_downloads(manual_days=manual_days)

# if collection is non-existent; assume it's not an update
if self.feed_type.lower() not in self.getTableNames():
Expand Down
8 changes: 8 additions & 0 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,14 @@ You can add your `NIST API Key <https://nvd.nist.gov/developers/request-an-api-k
:code:`NVD_NIST_API_KEY` (e.g., in the :code:`~/.cvexplore/.env` file). You can populate CveXplore without an API key,
but it will limit the amount of parallel requests made to the NIST API.

For the NVD API, the update starts from the last modified document in the database. In case of missing CPEs or CVEs
caused by failures during the regular updates you can manually update entries for 1–120 days. (If the period is longer
than 120 days you would need to re-populate the entire database.) Example:

.. code-block:: python
>>> cvx.database.update(manual_days=7)
Package usage
-------------

Expand Down

0 comments on commit c85a7fa

Please sign in to comment.