-
Notifications
You must be signed in to change notification settings - Fork 5
Expand file tree
/
Copy pathmain.py
More file actions
676 lines (596 loc) · 23.8 KB
/
main.py
File metadata and controls
676 lines (596 loc) · 23.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
"""RoboSystems Service API main application module."""
import time
from contextlib import asynccontextmanager
from datetime import UTC, datetime
from importlib.metadata import version as pkg_version
from pathlib import Path
from fastapi import FastAPI, Request, status
from fastapi.middleware.cors import CORSMiddleware
from fastapi.openapi.utils import get_openapi
from fastapi.responses import HTMLResponse, JSONResponse
from fastapi.staticfiles import StaticFiles
from robosystems.config import env
from robosystems.config.logging import get_logger
from robosystems.config.openapi_tags import MAIN_API_TAGS
from robosystems.config.validation import EnvValidator
from robosystems.middleware.database import DatabaseSessionMiddleware
from robosystems.middleware.logging import (
SecurityLoggingMiddleware,
StructuredLoggingMiddleware,
)
from robosystems.middleware.otel import setup_telemetry
from robosystems.middleware.otel.metrics import (
get_endpoint_metrics,
record_error_metrics,
record_request_metrics,
)
from robosystems.middleware.rate_limits import RateLimitHeaderMiddleware
from robosystems.routers import (
auth_router_v1,
billing_router_v1,
graph_router,
offering_router_v1,
operations_router_v1,
orgs_router_v1,
status_router_v1,
user_router_v1,
)
from robosystems.routers import (
router as v1_router,
)
from robosystems.routers.admin import (
cache_router as admin_cache_router,
)
from robosystems.routers.admin import (
credits_router as admin_credits_router,
)
from robosystems.routers.admin import (
graphs_router as admin_graphs_router,
)
from robosystems.routers.admin import (
invoice_router as admin_invoice_router,
)
from robosystems.routers.admin import (
orgs_router as admin_orgs_router,
)
from robosystems.routers.admin import (
subscription_router as admin_subscription_router,
)
from robosystems.routers.admin import (
users_router as admin_users_router,
)
from robosystems.routers.admin import (
webhooks_router as admin_webhooks_router,
)
from robosystems.utils.docs_template import (
generate_robosystems_docs,
generate_robosystems_redoc,
)
logger = get_logger("robosystems.api")
# Path prefixes whose responses may contain per-user secrets (tokens,
# API keys, billing details, org membership). These get `Cache-Control:
# no-store` applied in the security-headers middleware.
_SENSITIVE_PATH_PREFIXES = ("/v1/auth", "/v1/user", "/v1/billing", "/v1/orgs")
def is_relaxed_csp_path(path: str) -> bool:
"""Paths that need CDN-hosted scripts/styles (Swagger UI, GraphiQL)."""
if path in ("/", "/docs"):
return True
if path.startswith("/static"):
return True
if path.startswith("/extensions/") and path.endswith("/graphql"):
return True
return False
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Startup + shutdown lifecycle. Replaces deprecated @on_event handlers."""
logger.info("Starting RoboSystems API...")
# Validate environment configuration
try:
EnvValidator.validate_required_vars(env)
config_summary = EnvValidator.get_config_summary(env)
logger.info(f"Configuration validated successfully: {config_summary}")
except Exception as e:
logger.error(f"Configuration validation failed: {e}")
if env.ENVIRONMENT == "prod":
# Fail fast in prod; continue in dev so local iteration isn't blocked.
raise
logger.warning("Continuing with invalid configuration (development mode)")
# Initialize query queue executor
try:
from robosystems.routers.graphs.query.setup import setup_query_executor
setup_query_executor()
except Exception as e:
logger.error(f"Failed to initialize query queue: {e}")
# Start Redis SSE event subscriber for worker → API communication
try:
from robosystems.middleware.sse.redis_subscriber import start_redis_subscriber
await start_redis_subscriber()
logger.info("Redis SSE event subscriber started successfully")
except Exception as e:
logger.error(f"Failed to start Redis SSE subscriber: {e}")
logger.info("RoboSystems API startup complete")
yield
logger.info("Shutting down RoboSystems API...")
try:
from robosystems.middleware.sse.redis_subscriber import stop_redis_subscriber
await stop_redis_subscriber()
logger.info("Redis SSE event subscriber stopped successfully")
except Exception as e:
logger.error(f"Error stopping Redis SSE subscriber: {e}")
logger.info("RoboSystems API shutdown complete")
def create_app() -> FastAPI:
"""
Create the FastAPI app and include the routers.
Returns:
FastAPI: The configured FastAPI application.
"""
# Load description from markdown file in static folder
description_file = Path(__file__).parent / "static" / "description.md"
api_description = (
description_file.read_text()
if description_file.exists()
else "RoboSystems Service API"
)
app = FastAPI(
title="RoboSystems API",
version=pkg_version("robosystems"),
description=api_description,
docs_url=None, # replaced by custom_docs below
redoc_url=None, # replaced by custom_redoc below
openapi_url="/openapi.json",
openapi_tags=MAIN_API_TAGS,
lifespan=lifespan,
)
setup_telemetry(app)
app.state.current_time = datetime.now(UTC)
if Path("static").exists():
app.mount("/static", StaticFiles(directory="static"), name="static")
# Custom dark-themed Swagger + ReDoc (served inline from docs_template).
@app.get("/", response_class=HTMLResponse, include_in_schema=False)
async def custom_docs():
return HTMLResponse(content=generate_robosystems_docs())
@app.get("/docs", response_class=HTMLResponse, include_in_schema=False)
async def custom_redoc():
return HTMLResponse(content=generate_robosystems_redoc())
# Configure CORS with specific domains for security
main_cors_origins = env.get_main_cors_origins()
logger.info(f"Main API CORS origins: {main_cors_origins}")
app.add_middleware(
CORSMiddleware,
allow_origins=main_cors_origins,
allow_credentials=True, # Always enabled for cookie-based auth
allow_methods=["GET", "POST", "PUT", "DELETE", "OPTIONS", "PATCH"],
allow_headers=[
"Accept",
"Accept-Language",
"Content-Type",
"Authorization",
"X-API-Key",
"X-Requested-With",
# Operation endpoints under /extensions/{domain}/{graph_id}/operations/*
# accept Idempotency-Key for safe retries — must be in allow_headers
# or browser preflight will reject it for cross-origin requests.
"Idempotency-Key",
],
expose_headers=["X-Request-ID", "X-Rate-Limit-Remaining", "X-Rate-Limit-Reset"],
max_age=3600, # Cache preflight requests for 1 hour
)
# Add logging middleware (order matters - first added = outermost layer)
app.add_middleware(StructuredLoggingMiddleware)
app.add_middleware(SecurityLoggingMiddleware)
# Add database session cleanup middleware
app.add_middleware(DatabaseSessionMiddleware)
# Add rate limit header middleware
app.add_middleware(RateLimitHeaderMiddleware)
# Request-level metrics for /extensions/{graph_id}/graphql.
# Per-resolver spans come from Strawberry's OpenTelemetryExtensionSync
# (wired in graphql/schema.py); this covers the request envelope.
@app.middleware("http")
async def extensions_graphql_metrics_middleware(request: Request, call_next):
path = request.url.path
if not (path.startswith("/extensions/") and path.endswith("/graphql")):
return await call_next(request)
try:
graph_id = path.split("/", 3)[2]
except IndexError: # pragma: no cover - path matcher already checked shape
graph_id = None
# Normalized label keeps Prometheus cardinality bounded; tenant goes
# on the business event instead.
endpoint_label = "/extensions/{graph_id}/graphql"
start = time.time()
error_occurred = False
status_code = 200
user_id: str | None = None
try:
response = await call_next(request)
status_code = response.status_code
user_id = getattr(request.state, "user_id", None)
if request.method == "POST" and 200 <= status_code < 300:
get_endpoint_metrics().record_business_event(
endpoint=endpoint_label,
method=request.method,
event_type="extensions_graphql_query",
event_data={"graph_id": graph_id} if graph_id else {},
user_id=user_id,
)
return response
except Exception as exc:
error_occurred = True
status_code = getattr(exc, "status_code", 500)
record_error_metrics(
endpoint=endpoint_label,
method=request.method,
error_type=type(exc).__name__,
error_code=str(getattr(exc, "detail", "Unknown error")),
user_id=user_id,
)
raise
finally:
duration = time.time() - start
record_request_metrics(
endpoint=endpoint_label,
method=request.method,
status_code=status_code,
duration=duration,
user_id=user_id,
error_occurred=error_occurred,
)
# Add security headers middleware
@app.middleware("http")
async def security_headers_middleware(request: Request, call_next):
"""Add security headers to all responses."""
response = await call_next(request)
# Core security headers
response.headers["X-Content-Type-Options"] = "nosniff"
response.headers["X-Frame-Options"] = "DENY"
response.headers["Referrer-Policy"] = "strict-origin-when-cross-origin"
# HSTS for production/staging
if env.ENVIRONMENT in ["prod", "staging"]:
response.headers["Strict-Transport-Security"] = (
"max-age=31536000; includeSubDomains"
)
# Path-based CSP — strict for API, relaxed for docs / GraphiQL.
path = request.url.path
if is_relaxed_csp_path(path):
csp_directives = [
"default-src 'self'",
"script-src 'self' 'unsafe-inline' 'unsafe-eval' https://cdn.jsdelivr.net https://unpkg.com https://cdn.redoc.ly",
"style-src 'self' 'unsafe-inline' https://unpkg.com https://fonts.googleapis.com",
"img-src 'self' data: https: blob:",
"font-src 'self' data: https://fonts.gstatic.com",
"connect-src 'self' https://unpkg.com https://cdn.redoc.ly webpack:", # Allow source maps
"worker-src 'self' blob:", # Allow web workers from blob URLs
"frame-ancestors 'none'",
"base-uri 'self'",
"form-action 'self'",
]
else:
# Strict CSP for API endpoints
csp_directives = [
"default-src 'self'",
"script-src 'self'", # NO unsafe-inline for API
"style-src 'self'",
"img-src 'self' data:",
"connect-src 'self'",
"frame-ancestors 'none'",
"base-uri 'self'",
"form-action 'self'",
]
response.headers["Content-Security-Policy"] = "; ".join(csp_directives)
# Cache-Control: no-store for routes that may return per-user secrets.
# (Path-prefix allowlist — StreamingResponse has no .body to scan.)
if any(path.startswith(p) for p in _SENSITIVE_PATH_PREFIXES):
response.headers["Cache-Control"] = "no-store, no-cache, must-revalidate, private"
response.headers["Pragma"] = "no-cache"
# Permissions Policy
response.headers["Permissions-Policy"] = "geolocation=(), camera=(), microphone=()"
return response
# Exception handler for application-wide error handling
@app.exception_handler(Exception)
async def global_exception_handler(request: Request, exc: Exception) -> JSONResponse:
"""Global exception handler returning generic error and request ID.
Internal exception details are logged server-side; clients receive a generic
message with a correlation identifier.
"""
request_id = getattr(request.state, "request_id", None)
# Log full details with correlation ID
try:
logger.error(
"Unhandled exception", extra={"request_id": request_id}, exc_info=True
)
except Exception:
# Ensure handler never fails
pass
return JSONResponse(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
content={"detail": "Internal server error", "request_id": request_id},
)
# Core platform routers
app.include_router(auth_router_v1)
app.include_router(status_router_v1)
app.include_router(user_router_v1)
app.include_router(orgs_router_v1)
app.include_router(v1_router)
app.include_router(graph_router)
app.include_router(offering_router_v1)
app.include_router(operations_router_v1)
app.include_router(billing_router_v1)
# Extensions GraphQL endpoint (Strawberry). Graph-scoped at
# /extensions/{graph_id}/graphql — see robosystems/graphql/README.md.
if env.EXTENSIONS_GRAPHQL_ENABLED and (
env.ROBOLEDGER_ENABLED or env.ROBOINVESTOR_ENABLED
):
from fastapi import Depends as _Depends
from strawberry.fastapi import GraphQLRouter
from robosystems.graphql import schema as extensions_graphql_schema
from robosystems.graphql.context import get_context as graphql_context_getter
from robosystems.middleware.rate_limits import (
subscription_aware_rate_limit_dependency,
)
graphql_router = GraphQLRouter(
extensions_graphql_schema,
context_getter=graphql_context_getter,
# GraphiQL playground (dev only).
graphql_ide="graphiql" if env.is_development() else None,
)
app.include_router(
graphql_router,
prefix="/extensions/{graph_id}/graphql",
tags=["Extensions: GraphQL"],
include_in_schema=True,
dependencies=[_Depends(subscription_aware_rate_limit_dependency)],
)
# Extensions REST operation surface: POST /extensions/{domain}/{graph_id}/operations/{op}
if env.ROBOLEDGER_ENABLED:
from robosystems.routers.extensions.roboledger.operations import (
router as roboledger_operations_router,
)
from robosystems.routers.extensions.roboledger.reads import (
router as roboledger_reads_router,
)
app.include_router(
roboledger_operations_router,
prefix="/extensions/roboledger/{graph_id}/operations",
include_in_schema=True,
)
app.include_router(
roboledger_reads_router,
prefix="/extensions/roboledger/{graph_id}/operations",
include_in_schema=True,
)
# build-fact-grid mounts independently of ROBOLEDGER_ENABLED so SEC-only
# deployments still get it. Rationale in routers/extensions/roboledger/views.py.
if env.FACT_GRID_ENABLED:
from robosystems.routers.extensions.roboledger.views import (
router as roboledger_views_router,
)
app.include_router(
roboledger_views_router,
prefix="/extensions/roboledger/{graph_id}/operations",
include_in_schema=True,
)
if env.ROBOINVESTOR_ENABLED:
from robosystems.routers.extensions.roboinvestor.operations import (
router as roboinvestor_operations_router,
)
app.include_router(
roboinvestor_operations_router,
prefix="/extensions/roboinvestor/{graph_id}/operations",
include_in_schema=True,
)
# Admin routers — hidden from the public OpenAPI schema.
app.include_router(admin_cache_router, include_in_schema=False)
app.include_router(admin_subscription_router, include_in_schema=False)
app.include_router(admin_invoice_router, include_in_schema=False)
app.include_router(admin_webhooks_router, include_in_schema=False)
app.include_router(admin_credits_router, include_in_schema=False)
app.include_router(admin_graphs_router, include_in_schema=False)
app.include_router(admin_users_router, include_in_schema=False)
app.include_router(admin_orgs_router, include_in_schema=False)
# Custom OpenAPI schema
def custom_openapi():
"""
Custom OpenAPI schema generator.
Returns:
dict: The OpenAPI schema.
"""
if app.openapi_schema:
return app.openapi_schema
openapi_schema = get_openapi(
title=app.title,
version=app.version,
description=app.description,
routes=app.routes,
)
# Set up components structure if it doesn't exist
if "components" not in openapi_schema:
openapi_schema["components"] = {}
# Set up security schemes (API key and Bearer JWT)
openapi_schema["components"]["securitySchemes"] = {
"APIKeyHeader": {
"type": "apiKey",
"in": "header",
"name": "X-API-Key",
"description": "API key for authentication",
},
"BearerAuth": {
"type": "http",
"scheme": "bearer",
"bearerFormat": "JWT",
"description": "JWT bearer token",
},
}
# Ensure schemas section exists
if "schemas" not in openapi_schema["components"]:
openapi_schema["components"]["schemas"] = {}
# Shared error shape for every /extensions/*/operations/* route. SDK
# codegen tools use this to produce a typed error union so clients
# can branch on 409 / 422 / 4xx without unstructured `detail` reads.
openapi_schema["components"]["schemas"]["OperationError"] = {
"type": "object",
"description": (
"Error envelope returned by extensions operation endpoints. "
"Shape aligns with FastAPI's default error detail plus an optional "
"`operation_id` for audit correlation."
),
"properties": {
"detail": {
"oneOf": [
{"type": "string"},
{"type": "object"},
],
"description": "Human-readable error detail or structured payload",
},
"operation_id": {
"type": "string",
"description": (
"op_-prefixed ULID if the dispatcher minted one before the "
"failure (async ops, idempotency conflicts, etc.)"
),
},
},
}
# Shared error responses / Idempotency-Key header / rate-limit header
# specs, injected below into every extensions operation route so the
# router files stay thin and the wire shape stays consistent.
_op_error_ref = {
"application/json": {"schema": {"$ref": "#/components/schemas/OperationError"}}
}
_rate_limit_response_headers = {
"X-Rate-Limit-Remaining": {
"description": "Requests remaining in the current rate-limit window",
"schema": {"type": "integer"},
},
"X-Rate-Limit-Reset": {
"description": "Unix epoch seconds at which the current window resets",
"schema": {"type": "integer"},
},
}
_shared_operation_responses = {
"400": {"description": "Invalid request payload", "content": _op_error_ref},
"401": {"description": "Unauthorized — missing or invalid credentials"},
"403": {"description": "Forbidden — caller cannot access this graph"},
"404": {
"description": "Resource not found (graph, ledger, report, etc.)",
"content": _op_error_ref,
},
"409": {
"description": (
"Idempotency-Key reused with a different request body, or other "
"operation-level conflict"
),
"content": _op_error_ref,
},
"422": {
"description": "Semantic validation failure (unbalanced ledger, etc.)",
"content": _op_error_ref,
},
"429": {"description": "Rate limit exceeded"},
"500": {"description": "Internal error"},
}
_idempotency_header_parameter = {
"name": "Idempotency-Key",
"in": "header",
"required": False,
"description": (
"Optional client-supplied key for safe retries. Same key + same "
"body within 24 hours replays the cached envelope; same key + "
"different body returns HTTP 409 Conflict. Use a fresh key for "
"distinct payloads (UUID v4 recommended)."
),
"schema": {"type": "string", "maxLength": 255},
}
_idempotency_doc_paragraph = (
"\n\n**Idempotency**: supply an `Idempotency-Key` header to make "
"safe retries; replays within 24 hours return the same envelope. "
"Reusing the key with a different body returns HTTP 409 Conflict."
)
def _is_operation_path(p: str) -> bool:
return ("/extensions/" in p and "/operations/" in p) or (
"/graphs/" in p and "/operations/" in p
)
def _is_graphql_path(p: str) -> bool:
return p.startswith("/extensions/") and p.endswith("/graphql")
for _path, _methods in openapi_schema.get("paths", {}).items():
if _is_operation_path(_path):
for _method_name, _operation in _methods.items():
if _method_name != "post":
continue
existing_responses = _operation.setdefault("responses", {})
for _code, _resp in _shared_operation_responses.items():
existing_responses.setdefault(_code, _resp)
for _code, _resp in existing_responses.items():
if _code.startswith("2") and isinstance(_resp, dict):
_resp.setdefault("headers", {}).update(_rate_limit_response_headers)
params = _operation.setdefault("parameters", [])
if not any(
p.get("name") == "Idempotency-Key" and p.get("in") == "header"
for p in params
):
params.append(_idempotency_header_parameter)
if _idempotency_doc_paragraph not in _operation.get("description", ""):
_operation["description"] = (
_operation.get("description") or ""
) + _idempotency_doc_paragraph
elif _is_graphql_path(_path):
for _method_name, _operation in _methods.items():
if _method_name not in ("post", "get"):
continue
existing_responses = _operation.setdefault("responses", {})
existing_responses.setdefault(
"401",
{"description": "Unauthorized — credentials presented but invalid"},
)
existing_responses.setdefault(
"403",
{"description": "Forbidden — caller cannot access this graph"},
)
existing_responses.setdefault("429", {"description": "Rate limit exceeded"})
_graphql_description_note = (
"\n\n**Auth**: pass `X-API-Key` (or a JWT `Authorization: "
"Bearer` header). Unauthenticated introspection queries are "
"deliberately allowed for SDK codegen; data queries require "
"credentials and raise `UNAUTHENTICATED`."
"\n\n**Error codes**: `LEDGER_NOT_INITIALIZED`, "
"`INVESTOR_NOT_INITIALIZED`, and `UNAUTHENTICATED` surface in "
"the GraphQL `errors[].extensions.code` field — see "
"`graphql/README.md` for the full vocabulary."
)
if _graphql_description_note not in _operation.get("description", ""):
_operation["description"] = (
_operation.get("description") or ""
) + _graphql_description_note
# Declare API key + Bearer as accepted security schemes on every
# non-public endpoint.
public_exact_paths = {"/v1/status"}
public_prefixes = ("/v1/auth", "/v1/offering")
for path, methods in openapi_schema.get("paths", {}).items():
if path in public_exact_paths or any(path.startswith(p) for p in public_prefixes):
continue
for _method_name, operation in methods.items():
operation["security"] = [{"APIKeyHeader": []}, {"BearerAuth": []}]
# Apply the custom tag ordering from openapi_tags, only emitting
# tags that are actually in use.
tag_order = [tag_info["name"] for tag_info in app.openapi_tags or []]
existing_tags = {
tag
for path_info in openapi_schema["paths"].values()
for method_info in path_info.values()
for tag in method_info.get("tags", [])
}
tag_descriptions = {
tag_info["name"]: tag_info["description"] for tag_info in app.openapi_tags or []
}
ordered_tags = [
{"name": tag, "description": tag_descriptions.get(tag, f"{tag} operations")}
for tag in tag_order
if tag in existing_tags
]
for tag in existing_tags - set(tag_order):
ordered_tags.append({"name": tag, "description": f"{tag} operations"})
openapi_schema["tags"] = ordered_tags
app.openapi_schema = openapi_schema
return app.openapi_schema
app.openapi = custom_openapi
return app
app = create_app()