From bea9da53f8ff60354a759c9f482e16b3c3fc5002 Mon Sep 17 00:00:00 2001 From: Sameer Raheja Date: Mon, 22 Feb 2021 09:04:41 -0800 Subject: [PATCH] Updating getting started Databricks docs (#1768) * Updating getting started Databricks docs Signed-off-by: Sameer Raheja * Update databricks 7.3 link Signed-off-by: Sameer Raheja --- docs/demo/gpu-mortgage_accelerated.ipynb | 378 +++++++++++++++++- .../get-started/getting-started-databricks.md | 12 +- 2 files changed, 386 insertions(+), 4 deletions(-) diff --git a/docs/demo/gpu-mortgage_accelerated.ipynb b/docs/demo/gpu-mortgage_accelerated.ipynb index 2ce911b3a6b..2f2823f0c2d 100644 --- a/docs/demo/gpu-mortgage_accelerated.ipynb +++ b/docs/demo/gpu-mortgage_accelerated.ipynb @@ -1 +1,377 @@ -{"cells":[{"cell_type":"code","source":["%sh\n \nwget http://rapidsai-data.s3-website.us-east-2.amazonaws.com/notebook-mortgage-data/mortgage_2000.tgz -P /Users//\n \nmkdir -p /dbfs/FileStore/tables/mortgage\nmkdir -p /dbfs/FileStore/tables/mortgage_parquet_gpu/perf\nmkdir /dbfs/FileStore/tables/mortgage_parquet_gpu/acq\nmkdir /dbfs/FileStore/tables/mortgage_parquet_gpu/output\n \ntar xfvz /Users//mortgage_2000.tgz --directory /dbfs/FileStore/tables/mortgage\n"],"metadata":{},"outputs":[],"execution_count":1},{"cell_type":"code","source":["import time\nfrom pyspark import broadcast\nfrom pyspark.sql import SparkSession\nfrom pyspark.sql.functions import *\nfrom pyspark.sql.types import *\n\ndef _get_quarter_from_csv_file_name():\n return substring_index(substring_index(input_file_name(), '.', 1), '_', -1)\n\n_csv_perf_schema = StructType([\n StructField('loan_id', LongType()),\n StructField('monthly_reporting_period', StringType()),\n StructField('servicer', StringType()),\n StructField('interest_rate', DoubleType()),\n StructField('current_actual_upb', DoubleType()),\n StructField('loan_age', DoubleType()),\n StructField('remaining_months_to_legal_maturity', DoubleType()),\n StructField('adj_remaining_months_to_maturity', DoubleType()),\n StructField('maturity_date', StringType()),\n StructField('msa', DoubleType()),\n StructField('current_loan_delinquency_status', IntegerType()),\n StructField('mod_flag', StringType()),\n StructField('zero_balance_code', StringType()),\n StructField('zero_balance_effective_date', StringType()),\n StructField('last_paid_installment_date', StringType()),\n StructField('foreclosed_after', StringType()),\n StructField('disposition_date', StringType()),\n StructField('foreclosure_costs', DoubleType()),\n StructField('prop_preservation_and_repair_costs', DoubleType()),\n StructField('asset_recovery_costs', DoubleType()),\n StructField('misc_holding_expenses', DoubleType()),\n StructField('holding_taxes', DoubleType()),\n StructField('net_sale_proceeds', DoubleType()),\n StructField('credit_enhancement_proceeds', DoubleType()),\n StructField('repurchase_make_whole_proceeds', StringType()),\n StructField('other_foreclosure_proceeds', DoubleType()),\n StructField('non_interest_bearing_upb', DoubleType()),\n StructField('principal_forgiveness_upb', StringType()),\n StructField('repurchase_make_whole_proceeds_flag', StringType()),\n StructField('foreclosure_principal_write_off_amount', StringType()),\n StructField('servicing_activity_indicator', StringType())])\n_csv_acq_schema = StructType([\n StructField('loan_id', LongType()),\n StructField('orig_channel', StringType()),\n StructField('seller_name', StringType()),\n StructField('orig_interest_rate', DoubleType()),\n StructField('orig_upb', IntegerType()),\n StructField('orig_loan_term', IntegerType()),\n StructField('orig_date', StringType()),\n StructField('first_pay_date', StringType()),\n StructField('orig_ltv', DoubleType()),\n StructField('orig_cltv', DoubleType()),\n StructField('num_borrowers', DoubleType()),\n StructField('dti', DoubleType()),\n StructField('borrower_credit_score', DoubleType()),\n StructField('first_home_buyer', StringType()),\n StructField('loan_purpose', StringType()),\n StructField('property_type', StringType()),\n StructField('num_units', IntegerType()),\n StructField('occupancy_status', StringType()),\n StructField('property_state', StringType()),\n StructField('zip', IntegerType()),\n StructField('mortgage_insurance_percent', DoubleType()),\n StructField('product_type', StringType()),\n StructField('coborrow_credit_score', DoubleType()),\n StructField('mortgage_insurance_type', DoubleType()),\n StructField('relocation_mortgage_indicator', StringType())])\n\ndef read_perf_csv(spark, path):\n return spark.read.format('csv') \\\n .option('nullValue', '') \\\n .option('header', 'false') \\\n .option('delimiter', '|') \\\n .schema(_csv_perf_schema) \\\n .load(path) \\\n .withColumn('quarter', _get_quarter_from_csv_file_name())\n\ndef read_acq_csv(spark, path):\n return spark.read.format('csv') \\\n .option('nullValue', '') \\\n .option('header', 'false') \\\n .option('delimiter', '|') \\\n .schema(_csv_acq_schema) \\\n .load(path) \\\n .withColumn('quarter', _get_quarter_from_csv_file_name())\n\ndef _parse_dates(perf):\n return perf \\\n .withColumn('monthly_reporting_period', to_date(col('monthly_reporting_period'), 'MM/dd/yyyy')) \\\n .withColumn('monthly_reporting_period_month', month(col('monthly_reporting_period'))) \\\n .withColumn('monthly_reporting_period_year', year(col('monthly_reporting_period'))) \\\n .withColumn('monthly_reporting_period_day', dayofmonth(col('monthly_reporting_period'))) \\\n .withColumn('last_paid_installment_date', to_date(col('last_paid_installment_date'), 'MM/dd/yyyy')) \\\n .withColumn('foreclosed_after', to_date(col('foreclosed_after'), 'MM/dd/yyyy')) \\\n .withColumn('disposition_date', to_date(col('disposition_date'), 'MM/dd/yyyy')) \\\n .withColumn('maturity_date', to_date(col('maturity_date'), 'MM/yyyy')) \\\n .withColumn('zero_balance_effective_date', to_date(col('zero_balance_effective_date'), 'MM/yyyy'))\n\ndef _create_perf_deliquency(spark, perf):\n aggDF = perf.select(\n col(\"quarter\"),\n col(\"loan_id\"),\n col(\"current_loan_delinquency_status\"),\n when(col(\"current_loan_delinquency_status\") >= 1, col(\"monthly_reporting_period\")).alias(\"delinquency_30\"),\n when(col(\"current_loan_delinquency_status\") >= 3, col(\"monthly_reporting_period\")).alias(\"delinquency_90\"),\n when(col(\"current_loan_delinquency_status\") >= 6, col(\"monthly_reporting_period\")).alias(\"delinquency_180\")) \\\n .groupBy(\"quarter\", \"loan_id\") \\\n .agg(\n max(\"current_loan_delinquency_status\").alias(\"delinquency_12\"),\n min(\"delinquency_30\").alias(\"delinquency_30\"),\n min(\"delinquency_90\").alias(\"delinquency_90\"),\n min(\"delinquency_180\").alias(\"delinquency_180\")) \\\n .select(\n col(\"quarter\"),\n col(\"loan_id\"),\n (col(\"delinquency_12\") >= 1).alias(\"ever_30\"),\n (col(\"delinquency_12\") >= 3).alias(\"ever_90\"),\n (col(\"delinquency_12\") >= 6).alias(\"ever_180\"),\n col(\"delinquency_30\"),\n col(\"delinquency_90\"),\n col(\"delinquency_180\"))\n joinedDf = perf \\\n .withColumnRenamed(\"monthly_reporting_period\", \"timestamp\") \\\n .withColumnRenamed(\"monthly_reporting_period_month\", \"timestamp_month\") \\\n .withColumnRenamed(\"monthly_reporting_period_year\", \"timestamp_year\") \\\n .withColumnRenamed(\"current_loan_delinquency_status\", \"delinquency_12\") \\\n .withColumnRenamed(\"current_actual_upb\", \"upb_12\") \\\n .select(\"quarter\", \"loan_id\", \"timestamp\", \"delinquency_12\", \"upb_12\", \"timestamp_month\", \"timestamp_year\") \\\n .join(aggDF, [\"loan_id\", \"quarter\"], \"left_outer\")\n\n # calculate the 12 month delinquency and upb values\n months = 12\n monthArray = [lit(x) for x in range(0, 12)]\n # explode on a small amount of data is actually slightly more efficient than a cross join\n testDf = joinedDf \\\n .withColumn(\"month_y\", explode(array(monthArray))) \\\n .select(\n col(\"quarter\"),\n floor(((col(\"timestamp_year\") * 12 + col(\"timestamp_month\")) - 24000) / months).alias(\"josh_mody\"),\n floor(((col(\"timestamp_year\") * 12 + col(\"timestamp_month\")) - 24000 - col(\"month_y\")) / months).alias(\"josh_mody_n\"),\n col(\"ever_30\"),\n col(\"ever_90\"),\n col(\"ever_180\"),\n col(\"delinquency_30\"),\n col(\"delinquency_90\"),\n col(\"delinquency_180\"),\n col(\"loan_id\"),\n col(\"month_y\"),\n col(\"delinquency_12\"),\n col(\"upb_12\")) \\\n .groupBy(\"quarter\", \"loan_id\", \"josh_mody_n\", \"ever_30\", \"ever_90\", \"ever_180\", \"delinquency_30\", \"delinquency_90\", \"delinquency_180\", \"month_y\") \\\n .agg(max(\"delinquency_12\").alias(\"delinquency_12\"), min(\"upb_12\").alias(\"upb_12\")) \\\n .withColumn(\"timestamp_year\", floor((lit(24000) + (col(\"josh_mody_n\") * lit(months)) + (col(\"month_y\") - 1)) / lit(12))) \\\n .selectExpr('*', 'pmod(24000 + (josh_mody_n * {}) + month_y, 12) as timestamp_month_tmp'.format(months)) \\\n .withColumn(\"timestamp_month\", when(col(\"timestamp_month_tmp\") == lit(0), lit(12)).otherwise(col(\"timestamp_month_tmp\"))) \\\n .withColumn(\"delinquency_12\", ((col(\"delinquency_12\") > 3).cast(\"int\") + (col(\"upb_12\") == 0).cast(\"int\")).alias(\"delinquency_12\")) \\\n .drop(\"timestamp_month_tmp\", \"josh_mody_n\", \"month_y\")\n\n return perf.withColumnRenamed(\"monthly_reporting_period_month\", \"timestamp_month\") \\\n .withColumnRenamed(\"monthly_reporting_period_year\", \"timestamp_year\") \\\n .join(testDf, [\"quarter\", \"loan_id\", \"timestamp_year\", \"timestamp_month\"], \"left\") \\\n .drop(\"timestamp_year\", \"timestamp_month\")\n\n_name_mapping = [\n (\"WITMER FUNDING, LLC\", \"Witmer\"),\n (\"WELLS FARGO CREDIT RISK TRANSFER SECURITIES TRUST 2015\", \"Wells Fargo\"),\n (\"WELLS FARGO BANK, NA\" , \"Wells Fargo\"),\n (\"WELLS FARGO BANK, N.A.\" , \"Wells Fargo\"),\n (\"WELLS FARGO BANK, NA\" , \"Wells Fargo\"),\n (\"USAA FEDERAL SAVINGS BANK\" , \"USAA\"),\n (\"UNITED SHORE FINANCIAL SERVICES, LLC D\\\\/B\\\\/A UNITED WHOLESALE MORTGAGE\" , \"United Seq(e\"),\n (\"U.S. BANK N.A.\" , \"US Bank\"),\n (\"SUNTRUST MORTGAGE INC.\" , \"Suntrust\"),\n (\"STONEGATE MORTGAGE CORPORATION\" , \"Stonegate Mortgage\"),\n (\"STEARNS LENDING, LLC\" , \"Stearns Lending\"),\n (\"STEARNS LENDING, INC.\" , \"Stearns Lending\"),\n (\"SIERRA PACIFIC MORTGAGE COMPANY, INC.\" , \"Sierra Pacific Mortgage\"),\n (\"REGIONS BANK\" , \"Regions\"),\n (\"RBC MORTGAGE COMPANY\" , \"RBC\"),\n (\"QUICKEN LOANS INC.\" , \"Quicken Loans\"),\n (\"PULTE MORTGAGE, L.L.C.\" , \"Pulte Mortgage\"),\n (\"PROVIDENT FUNDING ASSOCIATES, L.P.\" , \"Provident Funding\"),\n (\"PROSPECT MORTGAGE, LLC\" , \"Prospect Mortgage\"),\n (\"PRINCIPAL RESIDENTIAL MORTGAGE CAPITAL RESOURCES, LLC\" , \"Principal Residential\"),\n (\"PNC BANK, N.A.\" , \"PNC\"),\n (\"PMT CREDIT RISK TRANSFER TRUST 2015-2\" , \"PennyMac\"),\n (\"PHH MORTGAGE CORPORATION\" , \"PHH Mortgage\"),\n (\"PENNYMAC CORP.\" , \"PennyMac\"),\n (\"PACIFIC UNION FINANCIAL, LLC\" , \"Other\"),\n (\"OTHER\" , \"Other\"),\n (\"NYCB MORTGAGE COMPANY, LLC\" , \"NYCB\"),\n (\"NEW YORK COMMUNITY BANK\" , \"NYCB\"),\n (\"NETBANK FUNDING SERVICES\" , \"Netbank\"),\n (\"NATIONSTAR MORTGAGE, LLC\" , \"Nationstar Mortgage\"),\n (\"METLIFE BANK, NA\" , \"Metlife\"),\n (\"LOANDEPOT.COM, LLC\" , \"LoanDepot.com\"),\n (\"J.P. MORGAN MADISON AVENUE SECURITIES TRUST, SERIES 2015-1\" , \"JP Morgan Chase\"),\n (\"J.P. MORGAN MADISON AVENUE SECURITIES TRUST, SERIES 2014-1\" , \"JP Morgan Chase\"),\n (\"JPMORGAN CHASE BANK, NATIONAL ASSOCIATION\" , \"JP Morgan Chase\"),\n (\"JPMORGAN CHASE BANK, NA\" , \"JP Morgan Chase\"),\n (\"JP MORGAN CHASE BANK, NA\" , \"JP Morgan Chase\"),\n (\"IRWIN MORTGAGE, CORPORATION\" , \"Irwin Mortgage\"),\n (\"IMPAC MORTGAGE CORP.\" , \"Impac Mortgage\"),\n (\"HSBC BANK USA, NATIONAL ASSOCIATION\" , \"HSBC\"),\n (\"HOMEWARD RESIDENTIAL, INC.\" , \"Homeward Mortgage\"),\n (\"HOMESTREET BANK\" , \"Other\"),\n (\"HOMEBRIDGE FINANCIAL SERVICES, INC.\" , \"HomeBridge\"),\n (\"HARWOOD STREET FUNDING I, LLC\" , \"Harwood Mortgage\"),\n (\"GUILD MORTGAGE COMPANY\" , \"Guild Mortgage\"),\n (\"GMAC MORTGAGE, LLC (USAA FEDERAL SAVINGS BANK)\" , \"GMAC\"),\n (\"GMAC MORTGAGE, LLC\" , \"GMAC\"),\n (\"GMAC (USAA)\" , \"GMAC\"),\n (\"FREMONT BANK\" , \"Fremont Bank\"),\n (\"FREEDOM MORTGAGE CORP.\" , \"Freedom Mortgage\"),\n (\"FRANKLIN AMERICAN MORTGAGE COMPANY\" , \"Franklin America\"),\n (\"FLEET NATIONAL BANK\" , \"Fleet National\"),\n (\"FLAGSTAR CAPITAL MARKETS CORPORATION\" , \"Flagstar Bank\"),\n (\"FLAGSTAR BANK, FSB\" , \"Flagstar Bank\"),\n (\"FIRST TENNESSEE BANK NATIONAL ASSOCIATION\" , \"Other\"),\n (\"FIFTH THIRD BANK\" , \"Fifth Third Bank\"),\n (\"FEDERAL HOME LOAN BANK OF CHICAGO\" , \"Fedral Home of Chicago\"),\n (\"FDIC, RECEIVER, INDYMAC FEDERAL BANK FSB\" , \"FDIC\"),\n (\"DOWNEY SAVINGS AND LOAN ASSOCIATION, F.A.\" , \"Downey Mortgage\"),\n (\"DITECH FINANCIAL LLC\" , \"Ditech\"),\n (\"CITIMORTGAGE, INC.\" , \"Citi\"),\n (\"CHICAGO MORTGAGE SOLUTIONS DBA INTERFIRST MORTGAGE COMPANY\" , \"Chicago Mortgage\"),\n (\"CHICAGO MORTGAGE SOLUTIONS DBA INTERBANK MORTGAGE COMPANY\" , \"Chicago Mortgage\"),\n (\"CHASE HOME FINANCE, LLC\" , \"JP Morgan Chase\"),\n (\"CHASE HOME FINANCE FRANKLIN AMERICAN MORTGAGE COMPANY\" , \"JP Morgan Chase\"),\n (\"CHASE HOME FINANCE (CIE 1)\" , \"JP Morgan Chase\"),\n (\"CHASE HOME FINANCE\" , \"JP Morgan Chase\"),\n (\"CASHCALL, INC.\" , \"CashCall\"),\n (\"CAPITAL ONE, NATIONAL ASSOCIATION\" , \"Capital One\"),\n (\"CALIBER HOME LOANS, INC.\" , \"Caliber Funding\"),\n (\"BISHOPS GATE RESIDENTIAL MORTGAGE TRUST\" , \"Bishops Gate Mortgage\"),\n (\"BANK OF AMERICA, N.A.\" , \"Bank of America\"),\n (\"AMTRUST BANK\" , \"AmTrust\"),\n (\"AMERISAVE MORTGAGE CORPORATION\" , \"Amerisave\"),\n (\"AMERIHOME MORTGAGE COMPANY, LLC\" , \"AmeriHome Mortgage\"),\n (\"ALLY BANK\" , \"Ally Bank\"),\n (\"ACADEMY MORTGAGE CORPORATION\" , \"Academy Mortgage\"),\n (\"NO CASH-OUT REFINANCE\" , \"OTHER REFINANCE\"),\n (\"REFINANCE - NOT SPECIFIED\" , \"OTHER REFINANCE\"),\n (\"Other REFINANCE\" , \"OTHER REFINANCE\")]\n\ndef _create_acquisition(spark, acq):\n nameMapping = spark.createDataFrame(_name_mapping, [\"from_seller_name\", \"to_seller_name\"])\n return acq.join(nameMapping, col(\"seller_name\") == col(\"from_seller_name\"), \"left\") \\\n .drop(\"from_seller_name\") \\\n .withColumn(\"old_name\", col(\"seller_name\")) \\\n .withColumn(\"seller_name\", coalesce(col(\"to_seller_name\"), col(\"seller_name\"))) \\\n .drop(\"to_seller_name\") \\\n .withColumn(\"orig_date\", to_date(col(\"orig_date\"), \"MM/yyyy\")) \\\n .withColumn(\"first_pay_date\", to_date(col(\"first_pay_date\"), \"MM/yyyy\")) \\\n\ndef run_mortgage(spark, perf, acq):\n parsed_perf = _parse_dates(perf)\n perf_deliqency = _create_perf_deliquency(spark, parsed_perf)\n cleaned_acq = _create_acquisition(spark, acq)\n return perf_deliqency.join(cleaned_acq, [\"loan_id\", \"quarter\"], \"inner\").drop(\"quarter\")"],"metadata":{},"outputs":[],"execution_count":2},{"cell_type":"code","source":["orig_perf_path='dbfs:///FileStore/tables/mortgage/perf/*'\norig_acq_path='dbfs:///FileStore/tables/mortgage/acq/*'\ntmp_perf_path='dbfs:///FileStore/tables/mortgage_parquet_gpu/perf/'\ntmp_acq_path='dbfs:///FileStore/tables/mortgage_parquet_gpu/acq/'\noutput_path='dbfs:///FileStore/tables/mortgage_parquet_gpu/output/'\n\nspark.conf.set('spark.rapids.sql.enabled','true')\nspark.conf.set('spark.rapids.sql.explain', 'ALL')\nspark.conf.set('spark.rapids.sql.incompatibleOps.enabled', 'true')\nspark.conf.set('spark.rapids.sql.batchSizeBytes', '512M')\nspark.conf.set('spark.rapids.sql.reader.batchSizeBytes', '768M')"],"metadata":{},"outputs":[],"execution_count":3},{"cell_type":"code","source":["# Lets transcode the data first\nstart = time.time()\n# we want a few big files instead of lots of small files\nspark.conf.set('spark.sql.files.maxPartitionBytes', '200G')\nacq = read_acq_csv(spark, orig_acq_path)\nacq.repartition(12).write.parquet(tmp_acq_path, mode='overwrite')\nperf = read_perf_csv(spark, orig_perf_path)\nperf.coalesce(96).write.parquet(tmp_perf_path, mode='overwrite')\nend = time.time()\nprint(end - start)"],"metadata":{},"outputs":[],"execution_count":4},{"cell_type":"code","source":["# Now lets actually process the data\\n\",\nstart = time.time()\nspark.conf.set('spark.sql.files.maxPartitionBytes', '1G')\nspark.conf.set('spark.sql.shuffle.partitions', '192')\nperf = spark.read.parquet(tmp_perf_path)\nacq = spark.read.parquet(tmp_acq_path)\nout = run_mortgage(spark, perf, acq)\nout.write.parquet(output_path, mode='overwrite')\nend = time.time()\nprint(end - start)\n"],"metadata":{},"outputs":[],"execution_count":5},{"cell_type":"code","source":[""],"metadata":{},"outputs":[],"execution_count":6}],"metadata":{"name":"gpu-mortgage_kr","notebookId":2710846968050572},"nbformat":4,"nbformat_minor":0} +{ + "cells": [ + { + "cell_type": "code", + "execution_count": 1, + "metadata": {}, + "outputs": [], + "source": [ + "%sh\n", + "\n", + "USER_ID=\n", + "\n", + "wget http://rapidsai-data.s3-website.us-east-2.amazonaws.com/notebook-mortgage-data/mortgage_2000.tgz -P /Users/${USER_ID}/\n", + " \n", + "mkdir -p /dbfs/FileStore/tables/mortgage\n", + "mkdir -p /dbfs/FileStore/tables/mortgage_parquet_gpu/perf\n", + "mkdir /dbfs/FileStore/tables/mortgage_parquet_gpu/acq\n", + "mkdir /dbfs/FileStore/tables/mortgage_parquet_gpu/output\n", + " \n", + "tar xfvz /Users/${USER_ID}/mortgage_2000.tgz --directory /dbfs/FileStore/tables/mortgage\n" + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "metadata": {}, + "outputs": [], + "source": [ + "import time\n", + "from pyspark import broadcast\n", + "from pyspark.sql import SparkSession\n", + "from pyspark.sql.functions import *\n", + "from pyspark.sql.types import *\n", + "\n", + "def _get_quarter_from_csv_file_name():\n", + " return substring_index(substring_index(input_file_name(), '.', 1), '_', -1)\n", + "\n", + "_csv_perf_schema = StructType([\n", + " StructField('loan_id', LongType()),\n", + " StructField('monthly_reporting_period', StringType()),\n", + " StructField('servicer', StringType()),\n", + " StructField('interest_rate', DoubleType()),\n", + " StructField('current_actual_upb', DoubleType()),\n", + " StructField('loan_age', DoubleType()),\n", + " StructField('remaining_months_to_legal_maturity', DoubleType()),\n", + " StructField('adj_remaining_months_to_maturity', DoubleType()),\n", + " StructField('maturity_date', StringType()),\n", + " StructField('msa', DoubleType()),\n", + " StructField('current_loan_delinquency_status', IntegerType()),\n", + " StructField('mod_flag', StringType()),\n", + " StructField('zero_balance_code', StringType()),\n", + " StructField('zero_balance_effective_date', StringType()),\n", + " StructField('last_paid_installment_date', StringType()),\n", + " StructField('foreclosed_after', StringType()),\n", + " StructField('disposition_date', StringType()),\n", + " StructField('foreclosure_costs', DoubleType()),\n", + " StructField('prop_preservation_and_repair_costs', DoubleType()),\n", + " StructField('asset_recovery_costs', DoubleType()),\n", + " StructField('misc_holding_expenses', DoubleType()),\n", + " StructField('holding_taxes', DoubleType()),\n", + " StructField('net_sale_proceeds', DoubleType()),\n", + " StructField('credit_enhancement_proceeds', DoubleType()),\n", + " StructField('repurchase_make_whole_proceeds', StringType()),\n", + " StructField('other_foreclosure_proceeds', DoubleType()),\n", + " StructField('non_interest_bearing_upb', DoubleType()),\n", + " StructField('principal_forgiveness_upb', StringType()),\n", + " StructField('repurchase_make_whole_proceeds_flag', StringType()),\n", + " StructField('foreclosure_principal_write_off_amount', StringType()),\n", + " StructField('servicing_activity_indicator', StringType())])\n", + "_csv_acq_schema = StructType([\n", + " StructField('loan_id', LongType()),\n", + " StructField('orig_channel', StringType()),\n", + " StructField('seller_name', StringType()),\n", + " StructField('orig_interest_rate', DoubleType()),\n", + " StructField('orig_upb', IntegerType()),\n", + " StructField('orig_loan_term', IntegerType()),\n", + " StructField('orig_date', StringType()),\n", + " StructField('first_pay_date', StringType()),\n", + " StructField('orig_ltv', DoubleType()),\n", + " StructField('orig_cltv', DoubleType()),\n", + " StructField('num_borrowers', DoubleType()),\n", + " StructField('dti', DoubleType()),\n", + " StructField('borrower_credit_score', DoubleType()),\n", + " StructField('first_home_buyer', StringType()),\n", + " StructField('loan_purpose', StringType()),\n", + " StructField('property_type', StringType()),\n", + " StructField('num_units', IntegerType()),\n", + " StructField('occupancy_status', StringType()),\n", + " StructField('property_state', StringType()),\n", + " StructField('zip', IntegerType()),\n", + " StructField('mortgage_insurance_percent', DoubleType()),\n", + " StructField('product_type', StringType()),\n", + " StructField('coborrow_credit_score', DoubleType()),\n", + " StructField('mortgage_insurance_type', DoubleType()),\n", + " StructField('relocation_mortgage_indicator', StringType())])\n", + "\n", + "def read_perf_csv(spark, path):\n", + " return spark.read.format('csv') \\\n", + " .option('nullValue', '') \\\n", + " .option('header', 'false') \\\n", + " .option('delimiter', '|') \\\n", + " .schema(_csv_perf_schema) \\\n", + " .load(path) \\\n", + " .withColumn('quarter', _get_quarter_from_csv_file_name())\n", + "\n", + "def read_acq_csv(spark, path):\n", + " return spark.read.format('csv') \\\n", + " .option('nullValue', '') \\\n", + " .option('header', 'false') \\\n", + " .option('delimiter', '|') \\\n", + " .schema(_csv_acq_schema) \\\n", + " .load(path) \\\n", + " .withColumn('quarter', _get_quarter_from_csv_file_name())\n", + "\n", + "def _parse_dates(perf):\n", + " return perf \\\n", + " .withColumn('monthly_reporting_period', to_date(col('monthly_reporting_period'), 'MM/dd/yyyy')) \\\n", + " .withColumn('monthly_reporting_period_month', month(col('monthly_reporting_period'))) \\\n", + " .withColumn('monthly_reporting_period_year', year(col('monthly_reporting_period'))) \\\n", + " .withColumn('monthly_reporting_period_day', dayofmonth(col('monthly_reporting_period'))) \\\n", + " .withColumn('last_paid_installment_date', to_date(col('last_paid_installment_date'), 'MM/dd/yyyy')) \\\n", + " .withColumn('foreclosed_after', to_date(col('foreclosed_after'), 'MM/dd/yyyy')) \\\n", + " .withColumn('disposition_date', to_date(col('disposition_date'), 'MM/dd/yyyy')) \\\n", + " .withColumn('maturity_date', to_date(col('maturity_date'), 'MM/yyyy')) \\\n", + " .withColumn('zero_balance_effective_date', to_date(col('zero_balance_effective_date'), 'MM/yyyy'))\n", + "\n", + "def _create_perf_deliquency(spark, perf):\n", + " aggDF = perf.select(\n", + " col(\"quarter\"),\n", + " col(\"loan_id\"),\n", + " col(\"current_loan_delinquency_status\"),\n", + " when(col(\"current_loan_delinquency_status\") >= 1, col(\"monthly_reporting_period\")).alias(\"delinquency_30\"),\n", + " when(col(\"current_loan_delinquency_status\") >= 3, col(\"monthly_reporting_period\")).alias(\"delinquency_90\"),\n", + " when(col(\"current_loan_delinquency_status\") >= 6, col(\"monthly_reporting_period\")).alias(\"delinquency_180\")) \\\n", + " .groupBy(\"quarter\", \"loan_id\") \\\n", + " .agg(\n", + " max(\"current_loan_delinquency_status\").alias(\"delinquency_12\"),\n", + " min(\"delinquency_30\").alias(\"delinquency_30\"),\n", + " min(\"delinquency_90\").alias(\"delinquency_90\"),\n", + " min(\"delinquency_180\").alias(\"delinquency_180\")) \\\n", + " .select(\n", + " col(\"quarter\"),\n", + " col(\"loan_id\"),\n", + " (col(\"delinquency_12\") >= 1).alias(\"ever_30\"),\n", + " (col(\"delinquency_12\") >= 3).alias(\"ever_90\"),\n", + " (col(\"delinquency_12\") >= 6).alias(\"ever_180\"),\n", + " col(\"delinquency_30\"),\n", + " col(\"delinquency_90\"),\n", + " col(\"delinquency_180\"))\n", + " joinedDf = perf \\\n", + " .withColumnRenamed(\"monthly_reporting_period\", \"timestamp\") \\\n", + " .withColumnRenamed(\"monthly_reporting_period_month\", \"timestamp_month\") \\\n", + " .withColumnRenamed(\"monthly_reporting_period_year\", \"timestamp_year\") \\\n", + " .withColumnRenamed(\"current_loan_delinquency_status\", \"delinquency_12\") \\\n", + " .withColumnRenamed(\"current_actual_upb\", \"upb_12\") \\\n", + " .select(\"quarter\", \"loan_id\", \"timestamp\", \"delinquency_12\", \"upb_12\", \"timestamp_month\", \"timestamp_year\") \\\n", + " .join(aggDF, [\"loan_id\", \"quarter\"], \"left_outer\")\n", + "\n", + " # calculate the 12 month delinquency and upb values\n", + " months = 12\n", + " monthArray = [lit(x) for x in range(0, 12)]\n", + " # explode on a small amount of data is actually slightly more efficient than a cross join\n", + " testDf = joinedDf \\\n", + " .withColumn(\"month_y\", explode(array(monthArray))) \\\n", + " .select(\n", + " col(\"quarter\"),\n", + " floor(((col(\"timestamp_year\") * 12 + col(\"timestamp_month\")) - 24000) / months).alias(\"josh_mody\"),\n", + " floor(((col(\"timestamp_year\") * 12 + col(\"timestamp_month\")) - 24000 - col(\"month_y\")) / months).alias(\"josh_mody_n\"),\n", + " col(\"ever_30\"),\n", + " col(\"ever_90\"),\n", + " col(\"ever_180\"),\n", + " col(\"delinquency_30\"),\n", + " col(\"delinquency_90\"),\n", + " col(\"delinquency_180\"),\n", + " col(\"loan_id\"),\n", + " col(\"month_y\"),\n", + " col(\"delinquency_12\"),\n", + " col(\"upb_12\")) \\\n", + " .groupBy(\"quarter\", \"loan_id\", \"josh_mody_n\", \"ever_30\", \"ever_90\", \"ever_180\", \"delinquency_30\", \"delinquency_90\", \"delinquency_180\", \"month_y\") \\\n", + " .agg(max(\"delinquency_12\").alias(\"delinquency_12\"), min(\"upb_12\").alias(\"upb_12\")) \\\n", + " .withColumn(\"timestamp_year\", floor((lit(24000) + (col(\"josh_mody_n\") * lit(months)) + (col(\"month_y\") - 1)) / lit(12))) \\\n", + " .selectExpr('*', 'pmod(24000 + (josh_mody_n * {}) + month_y, 12) as timestamp_month_tmp'.format(months)) \\\n", + " .withColumn(\"timestamp_month\", when(col(\"timestamp_month_tmp\") == lit(0), lit(12)).otherwise(col(\"timestamp_month_tmp\"))) \\\n", + " .withColumn(\"delinquency_12\", ((col(\"delinquency_12\") > 3).cast(\"int\") + (col(\"upb_12\") == 0).cast(\"int\")).alias(\"delinquency_12\")) \\\n", + " .drop(\"timestamp_month_tmp\", \"josh_mody_n\", \"month_y\")\n", + "\n", + " return perf.withColumnRenamed(\"monthly_reporting_period_month\", \"timestamp_month\") \\\n", + " .withColumnRenamed(\"monthly_reporting_period_year\", \"timestamp_year\") \\\n", + " .join(testDf, [\"quarter\", \"loan_id\", \"timestamp_year\", \"timestamp_month\"], \"left\") \\\n", + " .drop(\"timestamp_year\", \"timestamp_month\")\n", + "\n", + "_name_mapping = [\n", + " (\"WITMER FUNDING, LLC\", \"Witmer\"),\n", + " (\"WELLS FARGO CREDIT RISK TRANSFER SECURITIES TRUST 2015\", \"Wells Fargo\"),\n", + " (\"WELLS FARGO BANK, NA\" , \"Wells Fargo\"),\n", + " (\"WELLS FARGO BANK, N.A.\" , \"Wells Fargo\"),\n", + " (\"WELLS FARGO BANK, NA\" , \"Wells Fargo\"),\n", + " (\"USAA FEDERAL SAVINGS BANK\" , \"USAA\"),\n", + " (\"UNITED SHORE FINANCIAL SERVICES, LLC D\\\\/B\\\\/A UNITED WHOLESALE MORTGAGE\" , \"United Seq(e\"),\n", + " (\"U.S. BANK N.A.\" , \"US Bank\"),\n", + " (\"SUNTRUST MORTGAGE INC.\" , \"Suntrust\"),\n", + " (\"STONEGATE MORTGAGE CORPORATION\" , \"Stonegate Mortgage\"),\n", + " (\"STEARNS LENDING, LLC\" , \"Stearns Lending\"),\n", + " (\"STEARNS LENDING, INC.\" , \"Stearns Lending\"),\n", + " (\"SIERRA PACIFIC MORTGAGE COMPANY, INC.\" , \"Sierra Pacific Mortgage\"),\n", + " (\"REGIONS BANK\" , \"Regions\"),\n", + " (\"RBC MORTGAGE COMPANY\" , \"RBC\"),\n", + " (\"QUICKEN LOANS INC.\" , \"Quicken Loans\"),\n", + " (\"PULTE MORTGAGE, L.L.C.\" , \"Pulte Mortgage\"),\n", + " (\"PROVIDENT FUNDING ASSOCIATES, L.P.\" , \"Provident Funding\"),\n", + " (\"PROSPECT MORTGAGE, LLC\" , \"Prospect Mortgage\"),\n", + " (\"PRINCIPAL RESIDENTIAL MORTGAGE CAPITAL RESOURCES, LLC\" , \"Principal Residential\"),\n", + " (\"PNC BANK, N.A.\" , \"PNC\"),\n", + " (\"PMT CREDIT RISK TRANSFER TRUST 2015-2\" , \"PennyMac\"),\n", + " (\"PHH MORTGAGE CORPORATION\" , \"PHH Mortgage\"),\n", + " (\"PENNYMAC CORP.\" , \"PennyMac\"),\n", + " (\"PACIFIC UNION FINANCIAL, LLC\" , \"Other\"),\n", + " (\"OTHER\" , \"Other\"),\n", + " (\"NYCB MORTGAGE COMPANY, LLC\" , \"NYCB\"),\n", + " (\"NEW YORK COMMUNITY BANK\" , \"NYCB\"),\n", + " (\"NETBANK FUNDING SERVICES\" , \"Netbank\"),\n", + " (\"NATIONSTAR MORTGAGE, LLC\" , \"Nationstar Mortgage\"),\n", + " (\"METLIFE BANK, NA\" , \"Metlife\"),\n", + " (\"LOANDEPOT.COM, LLC\" , \"LoanDepot.com\"),\n", + " (\"J.P. MORGAN MADISON AVENUE SECURITIES TRUST, SERIES 2015-1\" , \"JP Morgan Chase\"),\n", + " (\"J.P. MORGAN MADISON AVENUE SECURITIES TRUST, SERIES 2014-1\" , \"JP Morgan Chase\"),\n", + " (\"JPMORGAN CHASE BANK, NATIONAL ASSOCIATION\" , \"JP Morgan Chase\"),\n", + " (\"JPMORGAN CHASE BANK, NA\" , \"JP Morgan Chase\"),\n", + " (\"JP MORGAN CHASE BANK, NA\" , \"JP Morgan Chase\"),\n", + " (\"IRWIN MORTGAGE, CORPORATION\" , \"Irwin Mortgage\"),\n", + " (\"IMPAC MORTGAGE CORP.\" , \"Impac Mortgage\"),\n", + " (\"HSBC BANK USA, NATIONAL ASSOCIATION\" , \"HSBC\"),\n", + " (\"HOMEWARD RESIDENTIAL, INC.\" , \"Homeward Mortgage\"),\n", + " (\"HOMESTREET BANK\" , \"Other\"),\n", + " (\"HOMEBRIDGE FINANCIAL SERVICES, INC.\" , \"HomeBridge\"),\n", + " (\"HARWOOD STREET FUNDING I, LLC\" , \"Harwood Mortgage\"),\n", + " (\"GUILD MORTGAGE COMPANY\" , \"Guild Mortgage\"),\n", + " (\"GMAC MORTGAGE, LLC (USAA FEDERAL SAVINGS BANK)\" , \"GMAC\"),\n", + " (\"GMAC MORTGAGE, LLC\" , \"GMAC\"),\n", + " (\"GMAC (USAA)\" , \"GMAC\"),\n", + " (\"FREMONT BANK\" , \"Fremont Bank\"),\n", + " (\"FREEDOM MORTGAGE CORP.\" , \"Freedom Mortgage\"),\n", + " (\"FRANKLIN AMERICAN MORTGAGE COMPANY\" , \"Franklin America\"),\n", + " (\"FLEET NATIONAL BANK\" , \"Fleet National\"),\n", + " (\"FLAGSTAR CAPITAL MARKETS CORPORATION\" , \"Flagstar Bank\"),\n", + " (\"FLAGSTAR BANK, FSB\" , \"Flagstar Bank\"),\n", + " (\"FIRST TENNESSEE BANK NATIONAL ASSOCIATION\" , \"Other\"),\n", + " (\"FIFTH THIRD BANK\" , \"Fifth Third Bank\"),\n", + " (\"FEDERAL HOME LOAN BANK OF CHICAGO\" , \"Fedral Home of Chicago\"),\n", + " (\"FDIC, RECEIVER, INDYMAC FEDERAL BANK FSB\" , \"FDIC\"),\n", + " (\"DOWNEY SAVINGS AND LOAN ASSOCIATION, F.A.\" , \"Downey Mortgage\"),\n", + " (\"DITECH FINANCIAL LLC\" , \"Ditech\"),\n", + " (\"CITIMORTGAGE, INC.\" , \"Citi\"),\n", + " (\"CHICAGO MORTGAGE SOLUTIONS DBA INTERFIRST MORTGAGE COMPANY\" , \"Chicago Mortgage\"),\n", + " (\"CHICAGO MORTGAGE SOLUTIONS DBA INTERBANK MORTGAGE COMPANY\" , \"Chicago Mortgage\"),\n", + " (\"CHASE HOME FINANCE, LLC\" , \"JP Morgan Chase\"),\n", + " (\"CHASE HOME FINANCE FRANKLIN AMERICAN MORTGAGE COMPANY\" , \"JP Morgan Chase\"),\n", + " (\"CHASE HOME FINANCE (CIE 1)\" , \"JP Morgan Chase\"),\n", + " (\"CHASE HOME FINANCE\" , \"JP Morgan Chase\"),\n", + " (\"CASHCALL, INC.\" , \"CashCall\"),\n", + " (\"CAPITAL ONE, NATIONAL ASSOCIATION\" , \"Capital One\"),\n", + " (\"CALIBER HOME LOANS, INC.\" , \"Caliber Funding\"),\n", + " (\"BISHOPS GATE RESIDENTIAL MORTGAGE TRUST\" , \"Bishops Gate Mortgage\"),\n", + " (\"BANK OF AMERICA, N.A.\" , \"Bank of America\"),\n", + " (\"AMTRUST BANK\" , \"AmTrust\"),\n", + " (\"AMERISAVE MORTGAGE CORPORATION\" , \"Amerisave\"),\n", + " (\"AMERIHOME MORTGAGE COMPANY, LLC\" , \"AmeriHome Mortgage\"),\n", + " (\"ALLY BANK\" , \"Ally Bank\"),\n", + " (\"ACADEMY MORTGAGE CORPORATION\" , \"Academy Mortgage\"),\n", + " (\"NO CASH-OUT REFINANCE\" , \"OTHER REFINANCE\"),\n", + " (\"REFINANCE - NOT SPECIFIED\" , \"OTHER REFINANCE\"),\n", + " (\"Other REFINANCE\" , \"OTHER REFINANCE\")]\n", + "\n", + "def _create_acquisition(spark, acq):\n", + " nameMapping = spark.createDataFrame(_name_mapping, [\"from_seller_name\", \"to_seller_name\"])\n", + " return acq.join(nameMapping, col(\"seller_name\") == col(\"from_seller_name\"), \"left\") \\\n", + " .drop(\"from_seller_name\") \\\n", + " .withColumn(\"old_name\", col(\"seller_name\")) \\\n", + " .withColumn(\"seller_name\", coalesce(col(\"to_seller_name\"), col(\"seller_name\"))) \\\n", + " .drop(\"to_seller_name\") \\\n", + " .withColumn(\"orig_date\", to_date(col(\"orig_date\"), \"MM/yyyy\")) \\\n", + " .withColumn(\"first_pay_date\", to_date(col(\"first_pay_date\"), \"MM/yyyy\")) \\\n", + "\n", + "def run_mortgage(spark, perf, acq):\n", + " parsed_perf = _parse_dates(perf)\n", + " perf_deliqency = _create_perf_deliquency(spark, parsed_perf)\n", + " cleaned_acq = _create_acquisition(spark, acq)\n", + " return perf_deliqency.join(cleaned_acq, [\"loan_id\", \"quarter\"], \"inner\").drop(\"quarter\")" + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "metadata": {}, + "outputs": [], + "source": [ + "orig_perf_path='dbfs:///FileStore/tables/mortgage/perf/*'\n", + "orig_acq_path='dbfs:///FileStore/tables/mortgage/acq/*'\n", + "tmp_perf_path='dbfs:///FileStore/tables/mortgage_parquet_gpu/perf/'\n", + "tmp_acq_path='dbfs:///FileStore/tables/mortgage_parquet_gpu/acq/'\n", + "output_path='dbfs:///FileStore/tables/mortgage_parquet_gpu/output/'\n", + "\n", + "spark.conf.set('spark.rapids.sql.enabled','true')\n", + "spark.conf.set('spark.rapids.sql.explain', 'ALL')\n", + "spark.conf.set('spark.rapids.sql.incompatibleOps.enabled', 'true')\n", + "spark.conf.set('spark.rapids.sql.batchSizeBytes', '512M')\n", + "spark.conf.set('spark.rapids.sql.reader.batchSizeBytes', '768M')" + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "metadata": {}, + "outputs": [], + "source": [ + "# Lets transcode the data first\n", + "start = time.time()\n", + "# we want a few big files instead of lots of small files\n", + "spark.conf.set('spark.sql.files.maxPartitionBytes', '200G')\n", + "acq = read_acq_csv(spark, orig_acq_path)\n", + "acq.repartition(12).write.parquet(tmp_acq_path, mode='overwrite')\n", + "perf = read_perf_csv(spark, orig_perf_path)\n", + "perf.coalesce(96).write.parquet(tmp_perf_path, mode='overwrite')\n", + "end = time.time()\n", + "print(end - start)" + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "metadata": {}, + "outputs": [], + "source": [ + "# Now lets actually process the data\\n\",\n", + "start = time.time()\n", + "spark.conf.set('spark.sql.files.maxPartitionBytes', '1G')\n", + "spark.conf.set('spark.sql.shuffle.partitions', '192')\n", + "perf = spark.read.parquet(tmp_perf_path)\n", + "acq = spark.read.parquet(tmp_acq_path)\n", + "out = run_mortgage(spark, perf, acq)\n", + "out.write.parquet(output_path, mode='overwrite')\n", + "end = time.time()\n", + "print(end - start)\n" + ] + }, + { + "cell_type": "code", + "execution_count": 6, + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.7.4" + }, + "name": "gpu-mortgage_kr", + "notebookId": 2710846968050572 + }, + "nbformat": 4, + "nbformat_minor": 1 +} diff --git a/docs/get-started/getting-started-databricks.md b/docs/get-started/getting-started-databricks.md index 3355c8926f2..cc8a168fe99 100644 --- a/docs/get-started/getting-started-databricks.md +++ b/docs/get-started/getting-started-databricks.md @@ -15,10 +15,14 @@ on NVIDIA GPUs on Databricks. * AWS: 7.3 LTS ML (GPU, Scala 2.12, Spark 3.0.1) * Azure: 7.3 LTS ML (GPU, Scala 2.12, Spark 3.0.1) +[Databricks 7.3 LTS +ML](https://docs.databricks.com/release-notes/runtime/7.3ml.html#system-environment) runs CUDA 10.1 +Update 2, and the initialization scripts will install the appropriate cudf version to match. + The number of GPUs per node dictates the number of Spark executors that can run in that node. ## Start a Databricks Cluster -Create a Databricks cluster by going to Clusters, then clicking “+ Create Cluster”. Ensure the +Create a Databricks cluster by going to Clusters, then clicking `+ Create Cluster`. Ensure the cluster meets the prerequisites above by configuring it as follows: 1. Select the Databricks Runtime Version from one of the supported runtimes specified in the Prerequisites section. @@ -92,15 +96,17 @@ larger dataset if needed. You can find the links to the datasets at ```bash %sh + +USER_ID= -wget http://rapidsai-data.s3-website.us-east-2.amazonaws.com/notebook-mortgage-data/mortgage_2000.tgz -P /Users// +wget http://rapidsai-data.s3-website.us-east-2.amazonaws.com/notebook-mortgage-data/mortgage_2000.tgz -P /Users/${USER_ID}/ mkdir -p /dbfs/FileStore/tables/mortgage mkdir -p /dbfs/FileStore/tables/mortgage_parquet_gpu/perf mkdir /dbfs/FileStore/tables/mortgage_parquet_gpu/acq mkdir /dbfs/FileStore/tables/mortgage_parquet_gpu/output -tar xfvz /Users//mortgage_2000.tgz --directory /dbfs/FileStore/tables/mortgage +tar xfvz /Users/${USER_ID}/mortgage_2000.tgz --directory /dbfs/FileStore/tables/mortgage ``` In Cell 3, update the data paths if necessary. The example notebook merges the columns and prepares