Skip to content

Commit eed71a6

Browse files
authored
Merge pull request #2 from messari/vincent/map-module
Support map modules and made initial snapshot optional
2 parents 7396f02 + c08a656 commit eed71a6

File tree

1 file changed

+27
-6
lines changed

1 file changed

+27
-6
lines changed

substreams/substream.py

Lines changed: 27 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,18 @@ def _parse_data_deltas(self, data: dict) -> list[dict]:
138138
deltas.append(d)
139139
return deltas
140140

141+
def _parse_data_outputs(self, data: dict) -> list[dict]:
142+
module_name: str = data["outputs"][0]["name"]
143+
obj_class = self._class_from_module(module_name)
144+
outputs = list()
145+
for output in data["outputs"]:
146+
map_output = output["mapOutput"]
147+
for key, items in map_output.items():
148+
if key == "items":
149+
for item in items:
150+
outputs.append(item)
151+
return outputs
152+
141153
@cached_property
142154
def output_modules(self) -> dict[str, Any]:
143155
module_map = {}
@@ -148,7 +160,9 @@ def output_modules(self) -> dict[str, Any]:
148160
output_type = map_output_type
149161
else:
150162
output_type = store_output_type
163+
151164
module_map[module.name] = {
165+
"is_map": map_output_type != "",
152166
"output_type": output_type,
153167
"initial_block": module.initial_block,
154168
}
@@ -163,21 +177,23 @@ def proto_file_map(self) -> dict[str, DescriptorProto]:
163177
return name_map
164178

165179
# TODO how do I type annotate this stuff?
166-
def poll(self, output_modules: list[str], start_block: int, end_block: int):
180+
def poll(self, output_modules: list[str], start_block: int, end_block: int, initial_snapshot = False):
167181
# TODO make this general
168182
from sf.substreams.v1.substreams_pb2 import STEP_IRREVERSIBLE, Request
169183

170184
for module in output_modules:
171185
if module not in self.output_modules:
172186
raise Exception(f"module '{module}' is not supported for {self.name}")
187+
self._class_from_module(module)
188+
173189
stream = self.service.Blocks(
174190
Request(
175191
start_block_num=start_block,
176192
stop_block_num=end_block,
177193
fork_steps=[STEP_IRREVERSIBLE],
178194
modules=self.pkg.modules,
179195
output_modules=output_modules,
180-
initial_store_snapshot_for_modules=output_modules,
196+
initial_store_snapshot_for_modules=output_modules if initial_snapshot else None,
181197
)
182198
)
183199
raw_results = defaultdict(lambda: {"data": list(), "snapshots": list()})
@@ -190,10 +206,15 @@ def poll(self, output_modules: list[str], start_block: int, end_block: int):
190206
raw_results[module_name]["snapshots"].extend(snapshot_deltas)
191207
if data:
192208
print('data block #', data["clock"]["number"])
193-
module_name: str = data["outputs"][0]["name"]
194-
data_deltas = self._parse_data_deltas(data)
195-
raw_results[module_name]["data"].extend(data_deltas)
196-
print('FINISH STREAM')
209+
if self.output_modules[module]["is_map"]:
210+
module_name: str = data["outputs"][0]["name"]
211+
data_outputs = self._parse_data_outputs(data)
212+
raw_results[module_name]["data"].extend(data_outputs)
213+
else:
214+
module_name: str = data["outputs"][0]["name"]
215+
data_deltas = self._parse_data_deltas(data)
216+
raw_results[module_name]["data"].extend(data_deltas)
217+
197218
results = []
198219
for output_module in output_modules:
199220
result = SubstreamOutput(module_name=output_module)

0 commit comments

Comments
 (0)