Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Tour of Beam] add work example #27080

Merged
merged 11 commits into from
Jun 26, 2023
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@
*/

// beam-playground:
// name: read-query
// description: BigQuery read query example.
// name: read-table
// description: BigQueryIO read table example.
// multifile: false
// context_line: 40
// context_line: 42
// never_run: true
// always_run: true
// categories:
// - Quickstart
// complexity: ADVANCED
Expand All @@ -29,47 +31,49 @@
package main

import (
_ "context"
_ "flag"
_ "github.com/apache/beam/sdks/v2/go/pkg/beam"
_ "github.com/apache/beam/sdks/v2/go/pkg/beam/io/bigqueryio"
_ "github.com/apache/beam/sdks/v2/go/pkg/beam/log"
_ "github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx"
_ "github.com/apache/beam/sdks/v2/go/pkg/beam/x/debug"
"context"
"github.com/apache/beam/sdks/v2/go/pkg/beam"
"github.com/apache/beam/sdks/v2/go/pkg/beam/io/bigqueryio"
"github.com/apache/beam/sdks/v2/go/pkg/beam/log"
"github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/top"
"github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx"
"github.com/apache/beam/sdks/v2/go/pkg/beam/x/debug"

"cloud.google.com/go/bigquery"
internal_log "log"
_ "reflect"
"reflect"
)

// Define the data model: The CommentRow struct is defined, which models one row of HackerNews comments.
//The bigquery tag in the struct field is used to map the struct field to the BigQuery column.
type CommentRow struct {
Text string `bigquery:"text"`
type Game struct {
GameID bigquery.NullString `bigquery:"gameId"`
GameNumber bigquery.NullInt64 `bigquery:"gameNumber"`
SeasonID bigquery.NullString `bigquery:"seasonId"`
Year bigquery.NullInt64 `bigquery:"year"`
Type bigquery.NullString `bigquery:"type"`
DayNight bigquery.NullString `bigquery:"dayNight"`
Duration bigquery.NullString `bigquery:"duration"`
}

// Construct the BigQuery query: A constant query is defined that selects the text column
// from the bigquery-public-data.hacker_news.comments table for a certain time range.
const query = `SELECT text
FROM ` + "`bigquery-public-data.hacker_news.comments`" + `
WHERE time_ts BETWEEN '2013-01-01' AND '2014-01-01'
LIMIT 1000
`

func main() {
internal_log.Println("Running Task")
/*
ctx := context.Background()
p := beam.NewPipeline()
s := p.Root()
project := "tess-372508"

// Build a PCollection<CommentRow> by querying BigQuery.
rows := bigqueryio.Query(s, project, query,
reflect.TypeOf(CommentRow{}), bigqueryio.UseStandardSQL())
ctx := context.Background()
p := beam.NewPipeline()
s := p.Root()
project := "apache-beam-testing"

debug.Print(s, rows)
// Build a PCollection<CommentRow> by querying BigQuery.
rows := bigqueryio.Query(s, project, "select * from `bigquery-public-data.baseball.schedules`",
reflect.TypeOf(Game{}), bigqueryio.UseStandardSQL())

// Now that the pipeline is fully constructed, we execute it.
if err := beamx.Run(ctx, p); err != nil {
log.Exitf(ctx, "Failed to execute job: %v", err)
}*/
fixedSizeLines := top.Largest(s, rows, 5, less)

debug.Print(s, fixedSizeLines)
// Now that the pipeline is fully constructed, we execute it.
if err := beamx.Run(ctx, p); err != nil {
log.Exitf(ctx, "Failed to execute job: %v", err)
}
}
func less(a, b Game) bool {
return true
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,10 @@
// tags:
// - hellobeam

import com.google.api.services.bigquery.model.TableRow;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.DoubleCoder;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryOptions;
import org.apache.beam.sdk.io.gcp.bigquery.SchemaAndRecord;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
Expand All @@ -40,52 +39,49 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Task {

public class Task {
private static final Logger LOG = LoggerFactory.getLogger(Task.class);

public static void main(String[] args) {
LOG.info("Running Task");
System.setProperty("GOOGLE_APPLICATION_CREDENTIALS", "to\\path\\credential.json");
PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create();
options.setTempLocation("gs://bucket");
options.as(BigQueryOptions.class).setProject("project-id");
private static final String WEATHER_SAMPLES_QUERY =
"select * from `clouddataflow-readonly.samples.weather_stations`";

Pipeline pipeline = Pipeline.create(options);
public static void applyBigQueryTornadoes(Pipeline p) {
/*TypedRead<TableRow> bigqueryIO =
BigQueryIO.readTableRows()
.fromQuery(WEATHER_SAMPLES_QUERY)
.usingStandardSql();

// pCollection.apply(BigQueryIO.read(... - This part of the pipeline reads from a BigQuery table using a SQL query and stores the result in a PCollection.
// The BigQueryIO.read() function is used to read from BigQuery. It is configured with a lambda function to extract a field from each record.
// The .fromQuery("SELECT field FROM project-id.dataset.table")
// specifies the SQL query used to read from BigQuery. You should replace "field", "project-id", "dataset", and "table" with your specific field name, project id, dataset name, and table name, respectively.
/*
PCollection<Double> pCollection = pipeline
.apply(BigQueryIO.read(
(SchemaAndRecord elem) -> (Double) elem.getRecord().get("field"))
.fromQuery(
"SELECT field FROM `project-id.dataset.table`")
.usingStandardSql()
.withCoder(DoubleCoder.of()));
pCollection
.apply("Log words", ParDo.of(new LogOutput<>()));
*/

pipeline.run();
PCollection<TableRow> rowsFromBigQuery = p.apply(bigqueryIO);

rowsFromBigQuery
.apply(ParDo.of(new LogOutput<>("Result: ")));*/
}

public static void runBigQueryTornadoes(PipelineOptions options) {
Pipeline p = Pipeline.create(options);
applyBigQueryTornadoes(p);
p.run().waitUntilFinish();
}

public static void main(String[] args) {
PipelineOptions options =
PipelineOptionsFactory.fromArgs(args).withValidation().as(PipelineOptions.class);
runBigQueryTornadoes(options);
}

static class LogOutput<T> extends DoFn<T, T> {
private final String prefix;

LogOutput() {
this.prefix = "Processing element";
}

LogOutput(String prefix) {
this.prefix = prefix;
}

@ProcessElement
public void processElement(ProcessContext c) throws Exception {
LOG.info(prefix + ": {}", c.element());
public void processElement(ProcessContext c) {
LOG.info(prefix + c.element());
c.output(c.element());
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@

# beam-playground:
# name: read-query
# description: TextIO read query example.
# description: BigQueryIO read query example.
# multifile: false
# never_run: true
# always_run: true
# context_line: 34
# categories:
# - Quickstart
Expand All @@ -26,39 +28,43 @@
# - hellobeam

import argparse
import os
import warnings

import apache_beam as beam
from apache_beam.io import ReadFromText
from apache_beam.io import WriteToBigQuery
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.options.pipeline_options import PipelineOptions, GoogleCloudOptions, SetupOptions
from apache_beam.io.gcp.bigquery import ReadFromBigQueryRequest, ReadAllFromBigQuery

class WeatherData:
def __init__(self, station_number, wban_number, year, month, day):
self.station_number = station_number
self.wban_number = wban_number
self.year = year
self.month = month
self.day = day

def __str__(self):
return f"Weather Data: Station {self.station_number} (WBAN {self.wban_number}), Date: {self.year}-{self.month}-{self.day}"

def run(argv=None):
parser = argparse.ArgumentParser()
parser.add_argument('--input',
dest='input',
default='gs://bucket',
help='Input file to process.')

known_args, pipeline_args = parser.parse_known_args(argv)

pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session = True
pipeline_options.view_as(PipelineOptions)

"""
(p | 'ReadTable' >> ReadFromBigQuery(query='SELECT * FROM project-id.dataset.table') - This part of the
pipeline reads from a BigQuery table using a SQL query and processes the result. The ReadFromBigQuery(
query='SELECT * FROM project-id.dataset.table') function is used to read from BigQuery. 'LogOutput' >>
beam.Map(lambda elem: print(f"Processing element: {elem['field']}"))) - This part of the pipeline processes the
PCollection and logs the output to the console. It prints the 'field' column from each row in the table.
"""

with beam.Pipeline(options=pipeline_options) as p:
(p #| 'ReadTable' >> beam.io.Read(beam.io.BigQuerySource(query='SELECT * FROM `project-id.dataset.table`')))
# Each row is a dictionary where the keys are the BigQuery columns
#| beam.Map(lambda elem: elem['field'])
)
with beam.Pipeline(options=pipeline_options, argv=argv) as p:
(p
# | 'ReadFromBigQuery' >> beam.io.ReadFromBigQuery(query='select * from `apache-beam-testing.clouddataflow_samples.weather_stations`',use_standard_sql=True,
# method=beam.io.ReadFromBigQuery.Method.DIRECT_READ)
# | beam.combiners.Sample.FixedSizeGlobally(5)
# | beam.FlatMap(lambda line: line)
# | beam.Map(lambda element: WeatherData(element['station_number'],element['wban_number'],element['year'],element['month'],element['day']))
# | beam.Map(print)
)


if __name__ == '__main__':
run()
run()
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
// complexity: ADVANCED
// tags:
// - hellobeam

package main

import (
Expand Down Expand Up @@ -62,7 +63,7 @@ func main() {
s := p.Root()
project := "apache-beam-testing"

// Build a PCollection<CommentRow> by querying BigQuery.
// Build a PCollection<Game> by querying BigQuery.
rows := bigqueryio.Read(s, project, "bigquery-public-data:baseball.schedules", reflect.TypeOf(Game{}))

fixedSizeLines := top.Largest(s, rows, 5, less)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,11 @@
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.*;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Sample;
import org.apache.beam.sdk.values.PCollection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

# beam-playground:
# name: read-table
# description: TextIO read table example.
# description: BigQueryIO read table example.
# multifile: false
# never_run: true
# always_run: true
Expand All @@ -28,13 +28,8 @@
# - hellobeam

import argparse
import os
import warnings

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions, GoogleCloudOptions, SetupOptions
#from google.cloud import bigquery
from apache_beam.io.gcp.bigquery import ReadFromBigQueryRequest, ReadAllFromBigQuery

class WeatherData:
def __init__(self, station_number, wban_number, year, month, day):
Expand All @@ -57,7 +52,7 @@ def run(argv=None):


with beam.Pipeline(options=pipeline_options, argv=argv) as p:
(p | 'ReadFromBigQuery' >> beam.io.ReadFromBigQuery(table='apache-beam-testing:clouddataflow_samples.weather_stations',
(p | 'ReadFromBigQuery' >> beam.io.ReadFromBigQuery(table='apache-beam-testing:clouddataflow_samples.weather_stations',
riteshghorse marked this conversation as resolved.
Show resolved Hide resolved
method=beam.io.ReadFromBigQuery.Method.DIRECT_READ)
| beam.combiners.Sample.FixedSizeGlobally(5)
| beam.FlatMap(lambda line: line)
Expand Down