@@ -138,6 +138,18 @@ def _parse_data_deltas(self, data: dict) -> list[dict]:
138138 deltas .append (d )
139139 return deltas
140140
141+ def _parse_data_outputs (self , data : dict ) -> list [dict ]:
142+ module_name : str = data ["outputs" ][0 ]["name" ]
143+ obj_class = self ._class_from_module (module_name )
144+ outputs = list ()
145+ for output in data ["outputs" ]:
146+ map_output = output ["mapOutput" ]
147+ for key , items in map_output .items ():
148+ if key == "items" :
149+ for item in items :
150+ outputs .append (item )
151+ return outputs
152+
141153 @cached_property
142154 def output_modules (self ) -> dict [str , Any ]:
143155 module_map = {}
@@ -148,7 +160,9 @@ def output_modules(self) -> dict[str, Any]:
148160 output_type = map_output_type
149161 else :
150162 output_type = store_output_type
163+
151164 module_map [module .name ] = {
165+ "is_map" : map_output_type != "" ,
152166 "output_type" : output_type ,
153167 "initial_block" : module .initial_block ,
154168 }
@@ -163,21 +177,23 @@ def proto_file_map(self) -> dict[str, DescriptorProto]:
163177 return name_map
164178
165179 # TODO how do I type annotate this stuff?
166- def poll (self , output_modules : list [str ], start_block : int , end_block : int ):
180+ def poll (self , output_modules : list [str ], start_block : int , end_block : int , initial_snapshot = False ):
167181 # TODO make this general
168182 from sf .substreams .v1 .substreams_pb2 import STEP_IRREVERSIBLE , Request
169183
170184 for module in output_modules :
171185 if module not in self .output_modules :
172186 raise Exception (f"module '{ module } ' is not supported for { self .name } " )
187+ self ._class_from_module (module )
188+
173189 stream = self .service .Blocks (
174190 Request (
175191 start_block_num = start_block ,
176192 stop_block_num = end_block ,
177193 fork_steps = [STEP_IRREVERSIBLE ],
178194 modules = self .pkg .modules ,
179195 output_modules = output_modules ,
180- initial_store_snapshot_for_modules = output_modules ,
196+ initial_store_snapshot_for_modules = output_modules if initial_snapshot else None ,
181197 )
182198 )
183199 raw_results = defaultdict (lambda : {"data" : list (), "snapshots" : list ()})
@@ -190,10 +206,15 @@ def poll(self, output_modules: list[str], start_block: int, end_block: int):
190206 raw_results [module_name ]["snapshots" ].extend (snapshot_deltas )
191207 if data :
192208 print ('data block #' , data ["clock" ]["number" ])
193- module_name : str = data ["outputs" ][0 ]["name" ]
194- data_deltas = self ._parse_data_deltas (data )
195- raw_results [module_name ]["data" ].extend (data_deltas )
196- print ('FINISH STREAM' )
209+ if self .output_modules [module ]["is_map" ]:
210+ module_name : str = data ["outputs" ][0 ]["name" ]
211+ data_outputs = self ._parse_data_outputs (data )
212+ raw_results [module_name ]["data" ].extend (data_outputs )
213+ else :
214+ module_name : str = data ["outputs" ][0 ]["name" ]
215+ data_deltas = self ._parse_data_deltas (data )
216+ raw_results [module_name ]["data" ].extend (data_deltas )
217+
197218 results = []
198219 for output_module in output_modules :
199220 result = SubstreamOutput (module_name = output_module )
0 commit comments