@@ -250,19 +250,91 @@ def match(actual):
250250 assert_that (results , matcher (1 , a_list , some_pairs ))
251251 pipeline .run ()
252252
253- def test_as_list_without_unique_labels (self ):
254- a_list = [1 , 2 , 3 ]
253+ def test_as_singleton_without_unique_labels (self ):
254+ # This should succeed as calling AsSingleton on the same PCollection twice
255+ # with the same defaults will return the same PCollectionView.
256+ a_list = [2 ]
257+ pipeline = Pipeline ('DirectPipelineRunner' )
258+ main_input = pipeline | Create ('main input' , [1 ])
259+ side_list = pipeline | Create ('side list' , a_list )
260+ results = main_input | FlatMap (
261+ 'test' ,
262+ lambda x , s1 , s2 : [[x , s1 , s2 ]],
263+ AsSingleton (side_list ), AsSingleton (side_list ))
264+
265+ def matcher (expected_elem , expected_singleton ):
266+ def match (actual ):
267+ [[actual_elem , actual_singleton1 , actual_singleton2 ]] = actual
268+ equal_to ([expected_elem ])([actual_elem ])
269+ equal_to ([expected_singleton ])([actual_singleton1 ])
270+ equal_to ([expected_singleton ])([actual_singleton2 ])
271+ return match
272+
273+ assert_that (results , matcher (1 , 2 ))
274+ pipeline .run ()
275+
276+ def test_as_singleton_with_different_defaults_without_unique_labels (self ):
277+ # This should fail as AsSingleton with distinct default values should create
278+ # distinct PCollectionViews with the same full_label.
279+ a_list = [2 ]
255280 pipeline = Pipeline ('DirectPipelineRunner' )
256281 main_input = pipeline | Create ('main input' , [1 ])
257282 side_list = pipeline | Create ('side list' , a_list )
283+
258284 with self .assertRaises (RuntimeError ) as e :
259285 _ = main_input | FlatMap (
260286 'test' ,
261- lambda x , ls1 , ls2 : [[x , ls1 , ls2 ]],
262- AsList (side_list ), AsList (side_list ))
287+ lambda x , s1 , s2 : [[x , s1 , s2 ]],
288+ AsSingleton (side_list ), AsSingleton (side_list , default_value = 3 ))
263289 self .assertTrue (
264290 e .exception .message .startswith (
265- 'Transform "AsList" does not have a stable unique label.' ))
291+ 'Transform "ViewAsSingleton(side list.None)" does not have a '
292+ 'stable unique label.' ))
293+
294+ def test_as_singleton_with_different_defaults_with_unique_labels (self ):
295+ a_list = []
296+ pipeline = Pipeline ('DirectPipelineRunner' )
297+ main_input = pipeline | Create ('main input' , [1 ])
298+ side_list = pipeline | Create ('side list' , a_list )
299+ results = main_input | FlatMap (
300+ 'test' ,
301+ lambda x , s1 , s2 : [[x , s1 , s2 ]],
302+ AsSingleton ('si1' , side_list , default_value = 2 ),
303+ AsSingleton ('si2' , side_list , default_value = 3 ))
304+
305+ def matcher (expected_elem , expected_singleton1 , expected_singleton2 ):
306+ def match (actual ):
307+ [[actual_elem , actual_singleton1 , actual_singleton2 ]] = actual
308+ equal_to ([expected_elem ])([actual_elem ])
309+ equal_to ([expected_singleton1 ])([actual_singleton1 ])
310+ equal_to ([expected_singleton2 ])([actual_singleton2 ])
311+ return match
312+
313+ assert_that (results , matcher (1 , 2 , 3 ))
314+ pipeline .run ()
315+
316+ def test_as_list_without_unique_labels (self ):
317+ # This should succeed as calling AsList on the same PCollection twice will
318+ # return the same PCollectionView.
319+ a_list = [1 , 2 , 3 ]
320+ pipeline = Pipeline ('DirectPipelineRunner' )
321+ main_input = pipeline | Create ('main input' , [1 ])
322+ side_list = pipeline | Create ('side list' , a_list )
323+ results = main_input | FlatMap (
324+ 'test' ,
325+ lambda x , ls1 , ls2 : [[x , ls1 , ls2 ]],
326+ AsList (side_list ), AsList (side_list ))
327+
328+ def matcher (expected_elem , expected_list ):
329+ def match (actual ):
330+ [[actual_elem , actual_list1 , actual_list2 ]] = actual
331+ equal_to ([expected_elem ])([actual_elem ])
332+ equal_to (expected_list )(actual_list1 )
333+ equal_to (expected_list )(actual_list2 )
334+ return match
335+
336+ assert_that (results , matcher (1 , [1 , 2 , 3 ]))
337+ pipeline .run ()
266338
267339 def test_as_list_with_unique_labels (self ):
268340 a_list = [1 , 2 , 3 ]
@@ -282,6 +354,9 @@ def match(actual):
282354 equal_to (expected_list )(actual_list2 )
283355 return match
284356
357+ assert_that (results , matcher (1 , [1 , 2 , 3 ]))
358+ pipeline .run ()
359+
285360 def test_as_dict_with_unique_labels (self ):
286361 some_kvs = [('a' , 1 ), ('b' , 2 )]
287362 pipeline = Pipeline ('DirectPipelineRunner' )
0 commit comments