Skip to content

Commit 0b154de

Browse files
authored
Merge pull request #6 from MichaelC1999/v0.0.7
Map module support and deprecate store logic, docs
2 parents 79a1216 + 2c1d5d0 commit 0b154de

File tree

4 files changed

+55
-36
lines changed

4 files changed

+55
-36
lines changed

README.md

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,11 +38,36 @@ In order to poll the substream, you will need to call the `poll()` function on t
3838
# View available modules on .spkg
3939
print(sb.output_modules)
4040

41-
# Poll the module and return a list of SubstreamOutput objects in the order of teh specified modules
41+
# Poll the module and return a list of SubstreamOutput objects in the order of the specified modules
4242
result = sb.poll(["store_swap_events"], start_block=10000835, end_block=10000835+20000)
4343
```
4444

45-
The result here is a `SubstreamOutput` object, you can access both the `data` and `snapshots` dataframes by doing:
45+
With the default inputs, this function outputs Pandas Dataframes after streaming all blocks between the start_block and end_block. However depending on how this function is called, a dict object is returned. The `poll()` function has a number of inputs
46+
47+
- output_modules
48+
- List of strings of output modules to stream
49+
- start_block
50+
- Integer block number to start the polling
51+
- end_block
52+
- Integer block number to end the polling. In theory, there is no max block number as any block number past chain head will stream the blocks in real time. Its recommended to use an end_block far off into the future if building a data app that will be streaming datain real time as blocks finalize, such as block 20,000,000
53+
- stream_callback
54+
- An optional callback function to be passed into the polling function to execute when valid streamed data is received
55+
- return_first_result
56+
- Boolean value that if True will return data on the first block after the start block to have an applicable TX/Event.
57+
- Can be called recursively on the front end while incrementing the start_block to return data as its streamed rather than all data at once after streaming is completed
58+
- Defaults to False
59+
- If True, the data is returned in the format {"data": [], "module_name": String, "data_block": int}
60+
- initial_snapshot
61+
- Boolean value, defaults to False
62+
- highest_processed_block
63+
- Integer block number that is used in measuring indexing and processing progress, in cases where return_progress is True
64+
- Defaults to 0
65+
- return_progress: bool = False,
66+
- Boolean value that if True returns progress in back processing
67+
- Defaults to False
68+
69+
70+
The result here is the default `SubstreamOutput` object, you can access both the `data` and `snapshots` dataframes by doing:
4671

4772
```python
4873
# These will return pandas DataFrames

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ build-backend = "hatchling.build"
44

55
[project]
66
name = "substreams"
7-
version = "0.0.6"
7+
version = "0.0.7"
88
authors = [
99
{ name="Ryan Sudhakaran", email="[email protected]" },
1010
{ name="Michael Carroll", email="[email protected]" },

setup.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55

66
setup(
77
name="substreams",
8-
version="0.0.6",
8+
version="0.0.7",
99
packages=[".substreams"],
1010
author="Ryan Sudhakaran",
1111
author_email="[email protected]",

substreams/substream.py

Lines changed: 26 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -126,25 +126,12 @@ def _parse_snapshot_deltas(self, snapshot: dict) -> list[dict]:
126126
for x in snapshot["deltas"].get("deltas", list())
127127
]
128128

129-
def _parse_data_deltas(self, data: dict) -> list[dict]:
130-
module_name: str = data["outputs"][0]["name"]
131-
obj_class = self._class_from_module(module_name)
132-
deltas = list()
133-
for output in data["outputs"]:
134-
store_deltas = output["storeDeltas"]
135-
if store_deltas:
136-
raw_deltas = store_deltas["deltas"]
137-
for delta in raw_deltas:
138-
raw = delta["newValue"]
139-
key = delta["key"]
140-
d = self._parse_from_string(raw, key, obj_class)
141-
d.update(data["clock"])
142-
deltas.append(d)
143-
return deltas
144-
145-
def _parse_data_outputs(self, data: dict) -> list[dict]:
129+
def _parse_data_outputs(self, data: dict, module_names: list[str]) -> list[dict]:
146130
outputs = list()
131+
module_set = set(module_names)
147132
for output in data["outputs"]:
133+
if "mapOutput" not in output or output["name"] not in module_set:
134+
continue
148135
map_output = output["mapOutput"]
149136
for key, items in map_output.items():
150137
if key == "items":
@@ -157,11 +144,8 @@ def output_modules(self) -> dict[str, Any]:
157144
module_map = {}
158145
for module in self.pkg.modules.ListFields()[0][1]:
159146
map_output_type = module.kind_map.output_type
160-
store_output_type = module.kind_store.value_type
161147
if map_output_type != "":
162148
output_type = map_output_type
163-
else:
164-
output_type = store_output_type
165149

166150
module_map[module.name] = {
167151
"is_map": map_output_type != "",
@@ -183,17 +167,18 @@ def poll(
183167
output_modules: list[str],
184168
start_block: int,
185169
end_block: int,
186-
stream_callback=None,
187-
return_first_result=False,
188-
initial_snapshot=False,
170+
stream_callback: Optional[callable] = None,
171+
return_first_result: bool = False,
172+
initial_snapshot: bool = False,
189173
highest_processed_block: int = 0,
190-
return_progress=False
174+
return_progress: bool = False,
191175
):
192176
from sf.substreams.v1.substreams_pb2 import STEP_IRREVERSIBLE, Request
193-
194177
for module in output_modules:
195178
if module not in self.output_modules:
196179
raise Exception(f"module '{module}' is not supported for {self.name}")
180+
if self.output_modules[module].get('is_map') is False:
181+
raise Exception(f"module '{module}' is not a map module")
197182
self._class_from_module(module)
198183

199184
stream = self.service.Blocks(
@@ -211,30 +196,39 @@ def poll(
211196
raw_results = defaultdict(lambda: {"data": list(), "snapshots": list()})
212197
results = []
213198
data_block = 0
214-
module_name: str = ""
199+
module_name = ""
200+
215201
try:
216202
for response in stream:
217203
snapshot = MessageToDict(response.snapshot_data)
218204
data = MessageToDict(response.data)
219205
progress = MessageToDict(response.progress)
206+
session = MessageToDict(response.session)
207+
208+
if session:
209+
continue
210+
220211
if snapshot:
221212
module_name = snapshot["moduleName"]
222213
snapshot_deltas = self._parse_snapshot_deltas(snapshot)
223214
raw_results[module_name]["snapshots"].extend(snapshot_deltas)
215+
224216
if data:
225-
if self.output_modules[module]["is_map"]:
226-
parsed = self._parse_data_outputs(data)
227-
else:
228-
parsed = self._parse_data_deltas(data)
217+
parsed = self._parse_data_outputs(data, output_modules)
229218
module_name = data["outputs"][0]["name"]
230219
raw_results[module_name]["data"].extend(parsed)
231220
data_block = data["clock"]["number"]
232221
if len(parsed) > 0:
222+
parsed = [dict(item, **{'block':data_block}) for item in parsed]
233223
if return_first_result is True:
234224
break
235225
if callable(stream_callback):
236226
stream_callback(module_name, parsed)
227+
else:
228+
continue
237229
elif progress and return_progress is True:
230+
if 'processedBytes' in progress["modules"][0] or 'processedRanges' not in progress["modules"][0]:
231+
continue
238232
endBlock = int(progress["modules"][0]['processedRanges']['processedRanges'][0]['endBlock'])
239233
data_block = endBlock
240234
if endBlock > highest_processed_block + 100 and progress["modules"][0]['name'] == output_modules[0]:
@@ -249,6 +243,6 @@ def poll(
249243
df["output_module"] = output_module
250244
setattr(result, k, df)
251245
results.append(result)
252-
except Exception as e:
253-
results.append({"error": e})
246+
except Exception as err:
247+
results = {"error": err}
254248
return results

0 commit comments

Comments
 (0)