@@ -78,23 +78,6 @@ def process(self, context):
7878 context .aggregate_to (self .umatched_words , 1 )
7979
8080
81- class AssertEqualsIgnoringOrderFn (df .DoFn ):
82- """A DoFn that asserts that its input is the same as the expected value.
83-
84- This DoFn is useful only for testing purposes with small data sets. It will
85- materialize all of its input and assumes that its input is a singleton.
86- """
87-
88- def __init__ (self , expected_elements ):
89- super (AssertEqualsIgnoringOrderFn , self ).__init__ ()
90- self .expected_elements = expected_elements
91-
92- def process (self , context ):
93- assert sorted (context .element ) == sorted (self .expected_elements ), (
94- 'AssertEqualsIgnoringOrderFn input does not match expected value.'
95- '%s != %s' % (context .element , self .expected_elements ))
96-
97-
9881class CountWords (df .PTransform ):
9982 """A transform to count the occurrences of each word.
10083
@@ -136,19 +119,15 @@ def run(argv=sys.argv[1:]):
136119 p | df .io .Read ('read' , df .io .TextFileSource (known_args .input ))
137120 | CountWords () | df .ParDo ('FilterText' , FilterTextFn ('Flourish|stomach' )))
138121
139- # AssertEqualsIgnoringOrderFn is a convenient DoFn to validate its input.
140- # Asserts are best used in unit tests with small data sets but is demonstrated
141- # here as a teaching tool.
122+ # assert_that is a convenient PTransform that checks a PCollection has an
123+ # expected value. Asserts are best used in unit tests with small data sets but
124+ # is demonstrated here as a teaching tool.
142125 #
143- # Note AssertEqualsIgnoringOrderFn does not provide any output and that
144- # successful completion of the Pipeline implies that the expectations were
145- # met. Learn more at
126+ # Note assert_that does not provide any output and that successful completion
127+ # of the Pipeline implies that the expectations were met. Learn more at
146128 # https://cloud.google.com/dataflow/pipelines/testing-your-pipeline on how to
147129 # test your pipeline.
148- # pylint: disable=expression-not-assigned
149- (filtered_words
150- | df .transforms .combiners .ToList ('ToList' )
151- | df .ParDo (AssertEqualsIgnoringOrderFn ([('Flourish' , 3 ), ('stomach' , 1 )])))
130+ df .assert_that (filtered_words , df .equal_to ([('Flourish' , 3 ), ('stomach' , 1 )]))
152131
153132 # Format the counts into a PCollection of strings and write the output using a
154133 # "Write" transform that has side effects.
0 commit comments