Skip to content

Commit 3a56ce7

Browse files
committed
Small fixes in BigQuery snippets and wordcount example.
1 parent 8e90369 commit 3a56ce7

File tree

2 files changed

+12
-33
lines changed

2 files changed

+12
-33
lines changed

README.md

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -332,10 +332,10 @@ you can write to.
332332

333333
```python
334334
import google.cloud.dataflow as df
335-
# The output table needs to point to something in your project.
336-
output_table = 'YOUR_PROJECT:DATASET.TABLE'
337335
input_table = 'clouddataflow-readonly:samples.weather_stations'
338-
p = df.Pipeline('DirectPipelineRunner')
336+
project = 'YOUR-PROJECT'
337+
output_table = '%s:DATASET.TABLENAME' % project
338+
p = df.Pipeline(argv=['--project', project])
339339
(p
340340
| df.Read('read', df.io.BigQuerySource(input_table))
341341
| df.FlatMap(
@@ -357,12 +357,12 @@ of using the whole table.
357357

358358
```python
359359
import google.cloud.dataflow as df
360-
# The output table needs to point to something in your project.
361-
output_table = 'YOUR_PROJECT:DATASET.TABLE'
360+
project = 'YOUR-PROJECT'
361+
output_table = '%s:DATASET.TABLENAME' % project
362362
input_query = 'SELECT month, COUNT(month) AS tornado_count ' \
363363
'FROM [clouddataflow-readonly:samples.weather_stations] ' \
364364
'WHERE tornado=true GROUP BY month'
365-
p = df.Pipeline('DirectPipelineRunner')
365+
p = df.Pipeline(argv=['--project', project])
366366
(p
367367
| df.Read('read', df.io.BigQuerySource(query=input_query))
368368
| df.Write('write', df.io.BigQuerySink(

google/cloud/dataflow/examples/wordcount_debugging.py

Lines changed: 6 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -78,23 +78,6 @@ def process(self, context):
7878
context.aggregate_to(self.umatched_words, 1)
7979

8080

81-
class AssertEqualsIgnoringOrderFn(df.DoFn):
82-
"""A DoFn that asserts that its input is the same as the expected value.
83-
84-
This DoFn is useful only for testing purposes with small data sets. It will
85-
materialize all of its input and assumes that its input is a singleton.
86-
"""
87-
88-
def __init__(self, expected_elements):
89-
super(AssertEqualsIgnoringOrderFn, self).__init__()
90-
self.expected_elements = expected_elements
91-
92-
def process(self, context):
93-
assert sorted(context.element) == sorted(self.expected_elements), (
94-
'AssertEqualsIgnoringOrderFn input does not match expected value.'
95-
'%s != %s' % (context.element, self.expected_elements))
96-
97-
9881
class CountWords(df.PTransform):
9982
"""A transform to count the occurrences of each word.
10083
@@ -136,19 +119,15 @@ def run(argv=sys.argv[1:]):
136119
p | df.io.Read('read', df.io.TextFileSource(known_args.input))
137120
| CountWords() | df.ParDo('FilterText', FilterTextFn('Flourish|stomach')))
138121

139-
# AssertEqualsIgnoringOrderFn is a convenient DoFn to validate its input.
140-
# Asserts are best used in unit tests with small data sets but is demonstrated
141-
# here as a teaching tool.
122+
# assert_that is a convenient PTransform that checks a PCollection has an
123+
# expected value. Asserts are best used in unit tests with small data sets but
124+
# is demonstrated here as a teaching tool.
142125
#
143-
# Note AssertEqualsIgnoringOrderFn does not provide any output and that
144-
# successful completion of the Pipeline implies that the expectations were
145-
# met. Learn more at
126+
# Note assert_that does not provide any output and that successful completion
127+
# of the Pipeline implies that the expectations were met. Learn more at
146128
# https://cloud.google.com/dataflow/pipelines/testing-your-pipeline on how to
147129
# test your pipeline.
148-
# pylint: disable=expression-not-assigned
149-
(filtered_words
150-
| df.transforms.combiners.ToList('ToList')
151-
| df.ParDo(AssertEqualsIgnoringOrderFn([('Flourish', 3), ('stomach', 1)])))
130+
df.assert_that(filtered_words, df.equal_to([('Flourish', 3), ('stomach', 1)]))
152131

153132
# Format the counts into a PCollection of strings and write the output using a
154133
# "Write" transform that has side effects.

0 commit comments

Comments
 (0)