Skip to content

Commit

Permalink
chore(data-warehouse): add salesforce orders (#25037)
Browse files Browse the repository at this point in the history
Co-authored-by: github-actions <41898282+github-actions[bot]@users.noreply.github.com>
  • Loading branch information
EDsCODE and github-actions[bot] committed Sep 20, 2024
1 parent f52bde5 commit c1fd7ce
Show file tree
Hide file tree
Showing 5 changed files with 71 additions and 22 deletions.
2 changes: 1 addition & 1 deletion latest_migrations.manifest
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ contenttypes: 0002_remove_content_type_name
ee: 0016_rolemembership_organization_member
otp_static: 0002_throttling
otp_totp: 0002_auto_20190420_0723
posthog: 0476_alter_integration_sensitive_config
posthog: 0477_datawarehouse_salesforce_order
sessions: 0001_initial
social_django: 0010_uid_db_index
two_factor: 0007_auto_20201201_1019
35 changes: 35 additions & 0 deletions posthog/migrations/0477_datawarehouse_salesforce_order.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# Generated by Django 4.2.15 on 2024-09-17 21:01

from django.db import migrations, connection


def insert_salesforce_order_schemas(apps, schema_editor):
with connection.cursor() as cursor:
cursor.execute("SELECT id, team_id FROM posthog_externaldatasource where source_type = 'Salesforce'")
salesforce_sources = cursor.fetchall()

ExternalDataSchema = apps.get_model("posthog", "ExternalDataSchema")
for source in salesforce_sources:
schema = ExternalDataSchema.objects.create(
name="Order",
source_id=source[0],
team_id=source[1],
should_sync=False,
sync_type=None,
sync_type_config={},
)
schema.save()


def reverse(apps, _):
pass


class Migration(migrations.Migration):
dependencies = [
("posthog", "0476_alter_integration_sensitive_config"),
]

operations = [
migrations.RunPython(insert_salesforce_order_schemas, reverse),
]
34 changes: 24 additions & 10 deletions posthog/temporal/data_imports/pipelines/salesforce/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ def get_resource(name: str, is_incremental: bool) -> EndpointResource:
"User": {
"name": "User",
"table_name": "user",
"primary_key": "id",
**({"primary_key": "id"} if is_incremental else {}),
"write_disposition": "replace",
"endpoint": {
"data_selector": "records",
Expand All @@ -26,7 +26,7 @@ def get_resource(name: str, is_incremental: bool) -> EndpointResource:
"UserRole": {
"name": "UserRole",
"table_name": "user_role",
"primary_key": "id",
**({"primary_key": "id"} if is_incremental else {}),
"write_disposition": "replace",
"endpoint": {
"data_selector": "records",
Expand All @@ -40,7 +40,7 @@ def get_resource(name: str, is_incremental: bool) -> EndpointResource:
"Lead": {
"name": "Lead",
"table_name": "lead",
"primary_key": "id",
**({"primary_key": "id"} if is_incremental else {}),
"write_disposition": "replace",
"endpoint": {
"data_selector": "records",
Expand All @@ -54,7 +54,7 @@ def get_resource(name: str, is_incremental: bool) -> EndpointResource:
"Contact": {
"name": "Contact",
"table_name": "contact",
"primary_key": "id",
**({"primary_key": "id"} if is_incremental else {}),
"write_disposition": "replace",
"endpoint": {
"data_selector": "records",
Expand All @@ -68,7 +68,7 @@ def get_resource(name: str, is_incremental: bool) -> EndpointResource:
"Campaign": {
"name": "Campaign",
"table_name": "campaign",
"primary_key": "id",
**({"primary_key": "id"} if is_incremental else {}),
"write_disposition": "replace",
"endpoint": {
"data_selector": "records",
Expand All @@ -82,7 +82,7 @@ def get_resource(name: str, is_incremental: bool) -> EndpointResource:
"Product2": {
"name": "Product2",
"table_name": "product2",
"primary_key": "id",
**({"primary_key": "id"} if is_incremental else {}),
"write_disposition": "replace",
"endpoint": {
"data_selector": "records",
Expand All @@ -96,7 +96,7 @@ def get_resource(name: str, is_incremental: bool) -> EndpointResource:
"Pricebook2": {
"name": "Pricebook2",
"table_name": "pricebook2",
"primary_key": "id",
**({"primary_key": "id"} if is_incremental else {}),
"write_disposition": "replace",
"endpoint": {
"data_selector": "records",
Expand All @@ -110,7 +110,7 @@ def get_resource(name: str, is_incremental: bool) -> EndpointResource:
"PricebookEntry": {
"name": "PricebookEntry",
"table_name": "pricebook_entry",
"primary_key": "id",
**({"primary_key": "id"} if is_incremental else {}),
"write_disposition": "replace",
"endpoint": {
"data_selector": "records",
Expand All @@ -121,10 +121,24 @@ def get_resource(name: str, is_incremental: bool) -> EndpointResource:
},
"table_format": "delta",
},
"Order": {
"name": "Order",
"table_name": "order",
**({"primary_key": "id"} if is_incremental else {}),
"write_disposition": "replace",
"endpoint": {
"data_selector": "records",
"path": "/services/data/v61.0/query",
"params": {
"q": "SELECT FIELDS(STANDARD) FROM Order",
},
},
"table_format": "delta",
},
"Account": {
"name": "Account",
"table_name": "account",
"primary_key": "Id",
**({"primary_key": "Id"} if is_incremental else {}),
"write_disposition": {
"disposition": "merge",
"strategy": "upsert",
Expand Down Expand Up @@ -194,7 +208,7 @@ def salesforce_source(
"paginator": SalesforceEndpointPaginator(instance_url=instance_url),
},
"resource_defaults": {
"primary_key": "id",
**({"primary_key": "id"} if is_incremental else {}),
},
"resources": [get_resource(endpoint, is_incremental)],
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
INCREMENTAL_ENDPOINTS = ("Account",)

ENDPOINTS = [
*("User", "UserRole", "Lead", "Contact", "Campaign", "Product2", "Pricebook2", "PricebookEntry"),
*("User", "UserRole", "Lead", "Contact", "Campaign", "Product2", "Pricebook2", "PricebookEntry", "Order"),
*INCREMENTAL_ENDPOINTS,
]

Expand Down
20 changes: 10 additions & 10 deletions posthog/temporal/data_imports/pipelines/vitally/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ def get_resource(name: str, is_incremental: bool) -> EndpointResource:
"Organizations": {
"name": "Organizations",
"table_name": "organizations",
"primary_key": "id",
**({"primary_key": "id"} if is_incremental else {}),
"write_disposition": {
"disposition": "merge",
"strategy": "upsert",
Expand Down Expand Up @@ -42,7 +42,7 @@ def get_resource(name: str, is_incremental: bool) -> EndpointResource:
"Accounts": {
"name": "Accounts",
"table_name": "accounts",
"primary_key": "id",
**({"primary_key": "id"} if is_incremental else {}),
"write_disposition": {
"disposition": "merge",
"strategy": "upsert",
Expand Down Expand Up @@ -70,7 +70,7 @@ def get_resource(name: str, is_incremental: bool) -> EndpointResource:
"Users": {
"name": "Users",
"table_name": "users",
"primary_key": "id",
**({"primary_key": "id"} if is_incremental else {}),
"write_disposition": {
"disposition": "merge",
"strategy": "upsert",
Expand Down Expand Up @@ -98,7 +98,7 @@ def get_resource(name: str, is_incremental: bool) -> EndpointResource:
"Conversations": {
"name": "Conversations",
"table_name": "conversations",
"primary_key": "id",
**({"primary_key": "id"} if is_incremental else {}),
"write_disposition": {
"disposition": "merge",
"strategy": "upsert",
Expand Down Expand Up @@ -126,7 +126,7 @@ def get_resource(name: str, is_incremental: bool) -> EndpointResource:
"Notes": {
"name": "Notes",
"table_name": "notes",
"primary_key": "id",
**({"primary_key": "id"} if is_incremental else {}),
"write_disposition": {
"disposition": "merge",
"strategy": "upsert",
Expand Down Expand Up @@ -154,7 +154,7 @@ def get_resource(name: str, is_incremental: bool) -> EndpointResource:
"Projects": {
"name": "Projects",
"table_name": "projects",
"primary_key": "id",
**({"primary_key": "id"} if is_incremental else {}),
"write_disposition": {
"disposition": "merge",
"strategy": "upsert",
Expand Down Expand Up @@ -182,7 +182,7 @@ def get_resource(name: str, is_incremental: bool) -> EndpointResource:
"Tasks": {
"name": "Tasks",
"table_name": "tasks",
"primary_key": "id",
**({"primary_key": "id"} if is_incremental else {}),
"write_disposition": {
"disposition": "merge",
"strategy": "upsert",
Expand Down Expand Up @@ -210,7 +210,7 @@ def get_resource(name: str, is_incremental: bool) -> EndpointResource:
"NPS_Responses": {
"name": "NPS_Responses",
"table_name": "nps_responses",
"primary_key": "id",
**({"primary_key": "id"} if is_incremental else {}),
"write_disposition": {
"disposition": "merge",
"strategy": "upsert",
Expand Down Expand Up @@ -238,7 +238,7 @@ def get_resource(name: str, is_incremental: bool) -> EndpointResource:
"Custom_Objects": {
"name": "Custom_Objects",
"table_name": "custom_objects",
"primary_key": "id",
**({"primary_key": "id"} if is_incremental else {}),
"write_disposition": {
"disposition": "merge",
"strategy": "upsert",
Expand Down Expand Up @@ -336,7 +336,7 @@ def vitally_source(
"paginator": VitallyPaginator(),
},
"resource_defaults": {
"primary_key": "id",
**({"primary_key": "id"} if is_incremental else {}),
"write_disposition": {
"disposition": "merge",
"strategy": "upsert",
Expand Down

0 comments on commit c1fd7ce

Please sign in to comment.