Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Develop an interface to access data tables from an external notebook or other outside analysis environment #127

Open
MattTriano opened this issue Apr 24, 2023 · 0 comments

Comments

@MattTriano
Copy link
Owner

I'm not sure if this should be in this repo or elsewhere, but for development purposes, this covers core functionality. In anything besides development on a local machine, the interface should probably change to require the user to provide the connection string.

from pathlib import Path

from sqlalchemy import create_engine
from sqlalchemy import inspect, text

class DWHInspector:
    def __init__(self, env_path: Path):
        self.engine = self.create_engine_from_env_file(env_path)

    def create_engine_from_env_file(self, env_path: Path):
        if env_path.is_file():
            with open(env_path, "r") as f:
                file_lines = f.readlines()
            contents = [l.replace("\n", "").split("=", 1) for l in file_lines if l != "\n" and not l.startswith("#")]
            dwh_conn_str = [el[1] for el in contents if el[0] == "AIRFLOW_CONN_DWH_DB_CONN"][0]
            dwh_conn_str = dwh_conn_str.replace("postgres", "postgresql", 1)
            dwh_conn_str = dwh_conn_str.replace("dwh_db:5432", "localhost:5431", 1)
            return create_engine(dwh_conn_str)
        
    def get_data_schema_names(self) -> List:
        insp = inspect(self.engine)
        return insp.get_schema_names()
    
    def get_data_table_names_in_schema(self, schema_name: str) -> List:
        insp = inspect(self.engine)
        return insp.get_table_names(schema=schema_name)
    
    def print_schema_names(self) -> None:
        print("Schemas in the Data Warehouse")
        for schema_name in self.get_data_schema_names():
            print(f"  - {schema_name}")

    def print_table_names_in_schema(self, schema_name: str) -> None:
        print(f"Tables in the {schema_name} schema")
        for table_name in self.get_data_table_names_in_schema(schema_name=schema_name):
            print(f"  - {schema_name}.{table_name}")

    def execute_result_returning_query(self, query: str) -> pd.DataFrame:
        with self.engine.connect() as conn:
            result = conn.execute(text(query))
            results_df = pd.DataFrame(result.fetchall(), columns=result.keys())
            if self.engine._is_future:
                conn.commit()
        return results_df
    
    def execute_structural_command(self, query: str) -> None:
        with self.engine.connect() as conn:
            with conn.begin():
                conn.execute(text(query))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant