|
1 | 1 | from typing import Any |
2 | 2 | from io import BytesIO, BufferedReader |
| 3 | +from mimetypes import guess_type |
| 4 | +import base64 |
| 5 | +import os |
3 | 6 |
|
4 | 7 | try: |
5 | 8 | from pydantic import BaseModel |
|
14 | 17 | from pydantic import ConfigDict |
15 | 18 | class FileCompatibleBaseModel(BaseModel): |
16 | 19 | """ |
17 | | - Patched model_dump to extract file objects from SerializationIterator in V2 |
18 | | - and return as BufferedReader |
| 20 | + Patched model_dump to extract file objects from SerializationIterator in V2 |
| 21 | + and return as BufferedReader or base64 encoded dict as needed. |
19 | 22 | """ |
20 | | - # Allow extra fields even if it is not defined. This will allow models |
| 23 | + # Allow extra fields even if it is not defined. This will allow models |
21 | 24 | # to be more flexible if features are added in the Anvil API, but |
22 | 25 | # explicit support hasn't been added yet to this library. |
23 | 26 | model_config = ConfigDict( |
@@ -46,21 +49,41 @@ def _iterator_to_buffered_reader(self, value): |
46 | 49 | def _check_if_serialization_iterator(self, value): |
47 | 50 | return str(type(value).__name__) == 'SerializationIterator' and hasattr(value, '__next__') |
48 | 51 |
|
| 52 | + def _process_file_data(self, file_obj): |
| 53 | + """Process file object into base64 encoded dict format.""" |
| 54 | + # Read the file data and encode it as base64 |
| 55 | + file_content = file_obj.read() |
| 56 | + |
| 57 | + # Get filename - handle both regular files and BytesIO objects |
| 58 | + filename = getattr(file_obj, 'name', "document.pdf") |
| 59 | + |
| 60 | + if isinstance(filename, (bytes, bytearray)): |
| 61 | + filename = filename.decode('utf-8') |
| 62 | + |
| 63 | + # manage mimetype based on file extension |
| 64 | + mimetype = guess_type(filename)[0] or 'application/pdf' |
| 65 | + |
| 66 | + return { |
| 67 | + 'data': base64.b64encode(file_content).decode('utf-8'), |
| 68 | + 'mimetype': mimetype, |
| 69 | + 'filename': os.path.basename(filename) |
| 70 | + } |
| 71 | + |
49 | 72 | def model_dump(self, **kwargs): |
50 | 73 | data = super().model_dump(**kwargs) |
51 | 74 | for key, value in data.items(): |
52 | 75 | if key == 'file' and self._check_if_serialization_iterator(value): |
53 | 76 | # Direct file case |
54 | | - data[key] = self._iterator_to_buffered_reader(value) |
| 77 | + file_obj = self._iterator_to_buffered_reader(value) |
| 78 | + data[key] = self._process_file_data(file_obj) |
55 | 79 | elif key == 'files' and isinstance(value, list): |
56 | 80 | # List of objects case |
57 | 81 | for index, item in enumerate(value): |
58 | 82 | if isinstance(item, dict) and 'file' in item: |
59 | 83 | if self._check_if_serialization_iterator(item['file']): |
60 | | - data[key][index]['file'] = self._iterator_to_buffered_reader(item['file']) |
| 84 | + file_obj = self._iterator_to_buffered_reader(item['file']) |
| 85 | + data[key][index]['file'] = self._process_file_data(file_obj) |
61 | 86 | return data |
62 | | - |
63 | | - |
64 | 87 |
|
65 | 88 | else: |
66 | 89 | FileCompatibleBaseModel = BaseModel |
0 commit comments