-
Notifications
You must be signed in to change notification settings - Fork 76
Expand file tree
/
Copy pathpyfunc_plugin.py
More file actions
278 lines (248 loc) · 8.72 KB
/
pyfunc_plugin.py
File metadata and controls
278 lines (248 loc) · 8.72 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
"""GStreamer plugin to execute user-defined Python functions.
Can be used for metadata conversion, inference post-processing, and
other tasks.
"""
import itertools
from typing import Any, Optional
from savant_rs.pipeline2 import VideoPipeline
from gst_plugins.python.pyfunc_common import handle_fatal_error, init_pyfunc
from savant.base.pyfunc import BasePyFuncPlugin, PyFunc
from savant.gstreamer import GLib, GObject, Gst, GstBase # noqa: F401
from savant.utils.log import LoggerMixin
# RGBA format is required to access the frame (pyds.get_nvds_buf_surface)
CAPS = Gst.Caps.from_string(
'video/x-raw(memory:NVMM), '
'format={RGBA}, '
f'width={Gst.IntRange(range(1, GLib.MAXINT))}, '
f'height={Gst.IntRange(range(1, GLib.MAXINT))}, '
f'framerate={Gst.FractionRange(Gst.Fraction(0, 1), Gst.Fraction(GLib.MAXINT, 1))}'
)
class GstPluginPyFunc(LoggerMixin, GstBase.BaseTransform):
"""PyFunc GStreamer plugin."""
GST_PLUGIN_NAME: str = 'pyfunc'
__gstmetadata__ = (
'GStreamer element to execute user-defined Python function',
'Transform',
'Provides a callback to execute user-defined Python functions on every frame. '
'Can be used for metadata conversion, inference post-processing, etc.',
'Den Medyantsev <[email protected]>',
)
__gsttemplates__ = (
Gst.PadTemplate.new(
'sink', Gst.PadDirection.SINK, Gst.PadPresence.ALWAYS, CAPS
),
Gst.PadTemplate.new('src', Gst.PadDirection.SRC, Gst.PadPresence.ALWAYS, CAPS),
Gst.PadTemplate.new(
'aux_src_%u', Gst.PadDirection.SRC, Gst.PadPresence.REQUEST, CAPS
),
)
__gproperties__ = {
'module': (
str,
'Python module',
'Python module name to import or module path.',
None,
GObject.ParamFlags.READWRITE,
),
'class': (
str,
'Python class name',
'Python class name to instantiate.',
None,
GObject.ParamFlags.READWRITE,
),
'kwargs': (
str,
'Keyword arguments for class initialization',
'Keyword argument for Python class initialization.',
None,
GObject.ParamFlags.READWRITE,
),
'pipeline': (
object,
'VideoPipeline object from savant-rs.',
'VideoPipeline object from savant-rs.',
GObject.ParamFlags.READWRITE,
),
'gst-pipeline': (
object,
'GstPipeline object.',
'GstPipeline object.',
GObject.ParamFlags.READWRITE,
),
'stream-pool-size': (
int,
'Max stream pool size',
'Max stream pool size',
1,
GLib.MAXINT,
1,
GObject.ParamFlags.READWRITE,
),
'dev-mode': (
bool,
'Dev mode flag',
(
'Whether to monitor source file changes at runtime'
' and reload the pyfunc objects when necessary.'
),
False,
GObject.ParamFlags.READWRITE,
),
}
def __init__(self):
super().__init__()
# properties
self.module: Optional[str] = None
self.class_name: Optional[str] = None
self.kwargs: Optional[str] = None
self.video_pipeline: Optional[VideoPipeline] = None
self.gst_pipeline: Optional['GstPipeline'] = None # noqa: F821
self.dev_mode: bool = False
self.max_stream_pool_size: int = 1
# pyfunc object
self.pyfunc: Optional[PyFunc] = None
self._aux_pad_idx_gen = itertools.count()
def do_get_property(self, prop: GObject.GParamSpec) -> Any:
"""Gst plugin get property function.
:param prop: structure that encapsulates the parameter info
"""
if prop.name == 'module':
return self.module
if prop.name == 'class':
return self.class_name
if prop.name == 'kwargs':
return self.kwargs
if prop.name == 'pipeline':
return self.video_pipeline
if prop.name == 'gst-pipeline':
return self.gst_pipeline
if prop.name == 'stream-pool-size':
return self.max_stream_pool_size
if prop.name == 'dev-mode':
return self.dev_mode
raise AttributeError(f'Unknown property {prop.name}.')
def do_set_property(self, prop: GObject.GParamSpec, value: Any):
"""Gst plugin set property function.
:param prop: structure that encapsulates the parameter info
:param value: new value for parameter, type dependents on parameter
"""
if prop.name == 'module':
self.module = value
elif prop.name == 'class':
self.class_name = value
elif prop.name == 'kwargs':
self.kwargs = value
elif prop.name == 'pipeline':
self.video_pipeline = value
elif prop.name == 'gst-pipeline':
self.gst_pipeline = value
elif prop.name == 'stream-pool-size':
self.max_stream_pool_size = value
elif prop.name == 'dev-mode':
self.dev_mode = value
else:
raise AttributeError(f'Unknown property {prop.name}.')
def do_start(self) -> bool:
"""Do on plugin start."""
if not self.module or not self.class_name:
return handle_fatal_error(
self,
self.logger,
None,
'Module and class name should be specified.',
self.dev_mode,
True,
False,
)
self.pyfunc = init_pyfunc(
self, self.logger, self.module, self.class_name, self.kwargs, self.dev_mode
)
try:
assert isinstance(self.pyfunc.instance, BasePyFuncPlugin), (
f'"{self.pyfunc}" should be an instance of "BasePyFuncPlugin" subclass.'
)
self.pyfunc.instance.gst_element = self
return self.pyfunc.instance.on_start()
except Exception as exc:
return handle_fatal_error(
self,
self.logger,
exc,
f'Error in on_start() call for {self.pyfunc}',
self.dev_mode,
True,
False,
)
def do_stop(self) -> bool:
"""Do on plugin stop."""
# pylint: disable=broad-exception-caught
try:
return self.pyfunc.instance.on_stop()
except Exception as exc:
return handle_fatal_error(
self,
self.logger,
exc,
f'Error in do_stop() call for {self.pyfunc}',
self.dev_mode,
True,
False,
)
def do_sink_event(self, event: Gst.Event) -> bool:
"""Do on sink event."""
# pylint: disable=broad-exception-caught
try:
self.pyfunc.instance.on_event(event)
except Exception as exc:
res = handle_fatal_error(
self,
self.logger,
exc,
f'Error in do_sink_event() call for {self.pyfunc}.',
self.dev_mode,
True,
False,
)
if not res:
return False
return self.srcpad.push_event(event)
def do_transform_ip(self, buffer: Gst.Buffer):
"""Transform buffer in-place function."""
# pylint: disable=broad-exception-caught
try:
self.pyfunc.instance.process_buffer(buffer)
except Exception as exc:
return handle_fatal_error(
self,
self.logger,
exc,
f'Error in process_buffer() call for {self.pyfunc}.',
self.dev_mode,
Gst.FlowReturn.OK,
Gst.FlowReturn.ERROR,
)
return Gst.FlowReturn.OK
def do_request_new_pad(
self,
templ: Gst.PadTemplate,
name: str = None,
caps: Gst.Caps = None,
):
"""Create a new pad on request."""
pad_name = templ.name_template % next(self._aux_pad_idx_gen)
self.logger.info('Creating auxiliary pad %s', pad_name)
pad: Gst.Pad = Gst.Pad.new_from_template(templ, pad_name)
if pad is None:
self.logger.error('Failed to create pad %s', pad_name)
return None
self.logger.debug('Created pad %s', pad.get_name())
self.add_pad(pad)
return pad
# register plugin
GObject.type_register(GstPluginPyFunc)
__gstelementfactory__ = (
GstPluginPyFunc.GST_PLUGIN_NAME,
Gst.Rank.NONE,
GstPluginPyFunc,
)