Skip to content

Commit

Permalink
Limit the number of records returned as per the page_size defined (#237)
Browse files Browse the repository at this point in the history
* Limit the number of records returned as per the page_size defined

* Modified read and get methods to accept page_size and pagination as optional parameters

* Removed the logger message

* Added comment to the place where page_size is hardcoded

* Get page_size in client.py and don't pass as a parameter

* Moved the changes to client.py

* Changed the call to dictionary by key value
  • Loading branch information
bhuvana-talend authored Oct 30, 2023
1 parent 16e72a9 commit 3ffab16
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 35 deletions.
89 changes: 60 additions & 29 deletions tests/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,8 @@ def datatype_transformations(self, stream, records):
##########################################################################
### GET
##########################################################################
def read(self, stream, parent_ids=[], since=''):
### Take pagination parameter to limit the pagination to first 2 pages tdl-16124 ###
def read(self, stream, parent_ids=[], since='', pagination=False):

# Resets the access_token if the expiry time is less than or equal to the current time
if self.CONFIG["token_expires"] <= datetime.datetime.utcnow():
Expand All @@ -187,29 +188,29 @@ def read(self, stream, parent_ids=[], since=''):
elif stream == 'owners':
return self.get_owners()
elif stream == 'companies':
return self.get_companies(since)
return self.get_companies(since, pagination)
elif stream == 'contact_lists':
return self.get_contact_lists(since)
return self.get_contact_lists(since, pagination=pagination)
elif stream == 'contacts_by_company':
return self.get_contacts_by_company(parent_ids)
return self.get_contacts_by_company(parent_ids, pagination)
elif stream == 'engagements':
return self.get_engagements()
return self.get_engagements(pagination)
elif stream == 'campaigns':
return self.get_campaigns()
elif stream == 'deals':
return self.get_deals()
elif stream == 'workflows':
return self.get_workflows()
elif stream == 'contacts':
return self.get_contacts()
return self.get_contacts(pagination)
elif stream == 'deal_pipelines':
return self.get_deal_pipelines()
elif stream == 'email_events':
return self.get_email_events()
return self.get_email_events(pagination)
elif stream == 'subscription_changes':
return self.get_subscription_changes(since)
return self.get_subscription_changes(since, pagination)
elif stream == "tickets":
return self.get_tickets()
return self.get_tickets(pagination)
else:
raise NotImplementedError

Expand Down Expand Up @@ -238,10 +239,11 @@ def _get_company_by_id(self, company_id):
response = self.get(url)
return response

def get_companies(self, since=''):
def get_companies(self, since='', pagination=False):
"""
Get all companies by paginating using 'hasMore' and 'offset'.
"""
page_size = self.BaseTest.expected_metadata().get('companies', {}).get(self.BaseTest.EXPECTED_PAGE_SIZE)
url = f"{BASE_URL}/companies/v2/companies/paged"
if not since:
since = self.start_date_strf
Expand Down Expand Up @@ -273,6 +275,8 @@ def get_companies(self, since=''):

has_more = response['has-more']
params['offset'] = response['offset']
if pagination and len(companies) > page_size+10:
break

# get the details of each company
for company in companies:
Expand All @@ -283,11 +287,12 @@ def get_companies(self, since=''):

return records

def get_contact_lists(self, since='', list_id=''):
def get_contact_lists(self, since='', list_id='', pagination=False):
"""
Get all contact_lists by paginating using 'has-more' and 'offset'.
"""
url = f"{BASE_URL}/contacts/v1/lists"
page_size = self.BaseTest.expected_metadata().get('contact_lists',{}).get(self.BaseTest.EXPECTED_PAGE_SIZE)

if list_id:
url += f"/{list_id}"
Expand All @@ -296,7 +301,7 @@ def get_contact_lists(self, since='', list_id=''):
return response

if since == 'all':
params = {'count': 250}
params = {'count': page_size}
else:
if not since:
since = self.start_date_strf
Expand All @@ -305,15 +310,14 @@ def get_contact_lists(self, since='', list_id=''):
since = datetime.datetime.strptime(since, self.START_DATE_FORMAT)

since = str(since.timestamp() * 1000).split(".")[0]
params = {'since': since, 'count': 250}
params = {'since': since, 'count': page_size}

records = []
replication_key = list(self.replication_keys['contact_lists'])[0]

# paginating through allxo the contact_lists
has_more = True
while has_more:

response = self.get(url, params=params)
for record in response['lists']:

Expand All @@ -322,6 +326,8 @@ def get_contact_lists(self, since='', list_id=''):

has_more = response['has-more']
params['offset'] = response['offset']
if pagination and len(records) > page_size+10:
break

return records

Expand Down Expand Up @@ -354,16 +360,17 @@ def _get_contacts_by_pks(self, pks):

return records[0]

def get_contacts(self):
def get_contacts(self, pagination=False):
"""
Get all contact vids by paginating using 'has-more' and 'vid-offset/vidOffset'.
Then use the vids to grab the detailed contacts records.
"""
page_size = self.BaseTest.expected_metadata().get('contacts',{}).get(self.BaseTest.EXPECTED_PAGE_SIZE)
url_1 = f"{BASE_URL}/contacts/v1/lists/all/contacts/all"
params_1 = {
'showListMemberships': True,
'includeVersion': True,
'count': 100,
'count': page_size,
}
vids = []
url_2 = f"{BASE_URL}/contacts/v1/contact/vids/batch/"
Expand All @@ -379,18 +386,21 @@ def get_contacts(self):
response_1 = self.get(url_1, params=params_1)
vids = [record['vid'] for record in response_1['contacts']
if record['versionTimestamp'] >= self.start_date]

has_more = response_1['has-more']
params_1['vidOffset'] = response_1['vid-offset']

# get the detailed contacts records by vids
params_2['vid'] = vids
response_2 = self.get(url_2, params=params_2)
records.extend([record for record in response_2.values()])
if pagination and len(records) > page_size+10:
break

records = self.denest_properties('contacts', records)
return records

def get_contacts_by_company(self, parent_ids):
def get_contacts_by_company(self, parent_ids, pagination=False):
"""
Get all contacts_by_company iterating over compnayId's and
paginating using 'hasMore' and 'vidOffset'. This stream is essentially
Expand All @@ -400,8 +410,9 @@ def get_contacts_by_company(self, parent_ids):
pulling the 'companyId' from each record to perform the corresponding get here.
"""

page_size = self.BaseTest.expected_metadata().get('contacts_by_company', {}).get(self.BaseTest.EXPECTED_PAGE_SIZE)
url = f"{BASE_URL}/companies/v2/companies/{{}}/vids"
params = dict()
params = {'count': page_size}
records = []

for parent_id in parent_ids:
Expand All @@ -416,8 +427,10 @@ def get_contacts_by_company(self, parent_ids):

has_more = response['hasMore']
params['vidOffset'] = response['vidOffset']
if pagination and len(records) > page_size+10:
break

params = dict()
params = {'count': page_size}

return records

Expand Down Expand Up @@ -512,13 +525,14 @@ def get_deals(self):
records = self.denest_properties('deals', records)
return records

def get_email_events(self, recipient=''):
def get_email_events(self, recipient='', pagination=False):
"""
Get all email_events by paginating using 'hasMore' and 'offset'.
"""
page_size = self.BaseTest.expected_metadata().get('email_events',{}).get(self.BaseTest.EXPECTED_PAGE_SIZE)
url = f"{BASE_URL}/email/public/v1/events"
replication_key = list(self.replication_keys['email_events'])[0]
params = dict()
params = {'count': page_size}
if recipient:
params['recipient'] = recipient
records = []
Expand All @@ -532,6 +546,8 @@ def get_email_events(self, recipient=''):

has_more = response['hasMore']
params['offset'] = response['offset']
if pagination and len(records) > page_size+10:
break

return records

Expand All @@ -549,13 +565,14 @@ def _get_engagements_by_pk(self, engagement_id):

return response

def get_engagements(self):
def get_engagements(self, pagination=False):
"""
Get all engagements by paginating using 'hasMore' and 'offset'.
"""
page_size = self.BaseTest.expected_metadata().get('engagements',{}).get(self.BaseTest.EXPECTED_PAGE_SIZE)
url = f"{BASE_URL}/engagements/v1/engagements/paged"
replication_key = list(self.replication_keys['engagements'])[0]
params = {'limit': 250}
params = {'limit': page_size}
records = []

has_more = True
Expand All @@ -570,6 +587,8 @@ def get_engagements(self):

has_more = response['hasMore']
params['offset'] = response['offset']
if pagination and len(records) > page_size+10:
break

return records

Expand Down Expand Up @@ -606,13 +625,14 @@ def get_owners(self):
transformed_records = self.datatype_transformations('owners', records)
return transformed_records

def get_subscription_changes(self, since=''):
def get_subscription_changes(self, since='', pagination=False):
"""
Get all subscription_changes from 'since' date by paginating using 'hasMore' and 'offset'.
Default since date is one week ago
"""
page_size = self.BaseTest.expected_metadata().get('subscription_changes',{}).get(self.BaseTest.EXPECTED_PAGE_SIZE)
url = f"{BASE_URL}/email/public/v1/subscriptions/timeline"
params = dict()
params = {'count': page_size}
records = []
replication_key = list(self.replication_keys['subscription_changes'])[0]
if not since:
Expand All @@ -632,6 +652,8 @@ def get_subscription_changes(self, since=''):
# this won't be feasible until BUG_TDL-14938 is addressed
if int(since) <= record['timestamp']:
records.append(record)
if pagination and len(records) > page_size+10:
break

return records

Expand Down Expand Up @@ -677,18 +699,19 @@ def get_tickets_properties(self):

return ",".join([record["name"] for record in records["results"]])

def get_tickets(self):
def get_tickets(self, pagination=False):
"""
Get all tickets.
HubSpot API https://developers.hubspot.com/docs/api/crm/tickets
"""
page_size = self.BaseTest.expected_metadata().get('tickets',{}).get(self.BaseTest.EXPECTED_PAGE_SIZE)
url = f"{BASE_URL}/crm/v4/objects/tickets"
replication_key = list(self.replication_keys["tickets"])[0]
records = []

# response = self.get(url)

params = {"limit": 100, "associations": "contact,company,deals", 'properties': self.get_tickets_properties()}
params = {"limit": page_size, "associations": "contact,company,deals", 'properties': self.get_tickets_properties()}
while True:
response = self.get(url, params=params)

Expand All @@ -698,6 +721,8 @@ def get_tickets(self):

if not response.get("paging"):
break
if page_size and len(records) > page_size+10:
break
params["after"] = response.get("paging").get("next").get("after")

records = self.denest_properties('tickets', records)
Expand Down Expand Up @@ -805,6 +830,10 @@ def create_contacts(self):
params = {'includeVersion': True}
get_resp = self.get(get_url, params=params)

created_time = get_resp.get('properties').get('createdate').get('value')
ts=int(created_time)/1000
LOGGER.info("Created Time %s", datetime.datetime.utcfromtimestamp(ts))

converted_versionTimestamp = self.BaseTest.datetime_from_timestamp(
get_resp['versionTimestamp'] / 1000, self.BOOKMARK_DATE_FORMAT
)
Expand Down Expand Up @@ -880,7 +909,8 @@ def create_contacts_by_company(self, company_ids=[], contact_records=[], times=1
if not company_ids:
company_ids = [company['companyId'] for company in self.get_companies()]
if not contact_records:
contact_records = self.get_contacts()
page_size = self.BaseTest.expected_metadata().get('contacts_by_company',{}).get(self.BaseTest.EXPECTED_PAGE_SIZE)
contact_records = self.get_contacts(page_size)

records = []
for _ in range(times):
Expand Down Expand Up @@ -1041,7 +1071,8 @@ def create_engagements(self):
record_uuid = str(uuid.uuid4()).replace('-', '')

# gather all contacts and randomly choose one that has not hit the limit
contact_records = self.get_contacts()
page_size = self.BaseTest.expected_metadata().get('engagements',{}).get(self.BaseTest.EXPECTED_PAGE_SIZE)
contact_records = self.get_contacts(page_size)
contact_ids = [contact['vid']
for contact in contact_records
if contact['vid'] != 2304] # contact 2304 has hit the 10,000 assoc limit
Expand Down
8 changes: 2 additions & 6 deletions tests/test_hubspot_pagination.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,15 +46,12 @@ def setUp(self):

# generate test data if necessary, one stream at a time
for stream in streams:

# Get all records
if stream == 'contacts_by_company':
company_ids = [company['companyId'] for company in existing_records['companies']]
existing_records[stream] = test_client.read(stream, parent_ids=company_ids)
elif stream in {'companies', 'contact_lists', 'subscription_changes', 'engagements', 'email_events'}:
existing_records[stream] = test_client.read(stream)
existing_records[stream] = test_client.read(stream, parent_ids=company_ids, pagination=True)
else:
existing_records[stream] = test_client.read(stream)
existing_records[stream] = test_client.read(stream, pagination=True)

# check if we exceed the pagination limit
LOGGER.info(f"Pagination limit set to - {limits[stream]} and total number of existing record - {len(existing_records[stream])}")
Expand Down Expand Up @@ -92,7 +89,6 @@ def streams_to_test(self):
'email_events',
'subscription_changes', # BUG_TDL-14938 https://jira.talendforge.org/browse/TDL-14938
})

return streams_to_test

def test_run(self):
Expand Down

0 comments on commit 3ffab16

Please sign in to comment.