Coverage for rfpy/suxint.py: 99%
457 statements
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-24 10:52 +0000
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-24 10:52 +0000
1import re
2import inspect
3import logging
4import importlib
5import textwrap
6from itertools import chain
7from collections import defaultdict
8from typing import Iterable, Callable, Optional, Sequence
9from inspect import Parameter, Signature
12from apispec import APISpec
13from pydantic import BaseModel
15from pydantic.json_schema import models_json_schema
17from rfpy.web.ext.apispec import SuxPlugin
18from rfpy.web.ext.openapi_types import openapi_type_mapping
21class RoutingError(AttributeError):
22 pass
25HANDLER_PREFIX = re.compile(r"^(get|post|put|delete)_|get")
26ARG_REGEX = r"/(\d+|:\w+)"
28log = logging.getLogger(__name__)
31def _set_exposed(handler):
32 fname = handler.__name__
33 if not HANDLER_PREFIX.match(fname):
34 raise RoutingError(
35 f"{fname} is not a valid name for a handler function."
36 " Must start with get, post, put or delete"
37 )
38 handler.http_exposed = True
41def http(handler):
42 """
43 Decorator to set exposed=True on a handler callable
44 """
45 _set_exposed(handler)
46 return handler
49def http_etag(handler):
50 """
51 Decorator to set attributes exposed=True and etag=True on a handler callable
52 """
53 _set_exposed(handler)
54 handler.etag = True
55 return handler
58class Sux(object):
59 """
60 Sux is a dispatcher. It inspects an http request, finds a corresponding callable,
61 and invokes it. The motivation is to avoid having a routes table which matches
62 regular expresions to handler functions.
64 Handler Matching
65 Handlers are matched by simply converting slashes in a URL into underscores
67 GET /this/that/ -> get_this_that
68 POST /ding/dong/ -> post_ding_dong
70 A simple heurestic / stupid shortcut is used to match URL path arguments:
71 anything that looks like an argument is stripped out before the function is matched.
72 This stripping is done by applying a regular expresion (self.arg_regex).
74 The default regex is r'/(\\d+|:\\w+)' - i.e. extract digits or strings that
75 start with a colon from the URL path.
77 GET /this/23/that/ -> get_this_that(23)
78 GET /some/:cake -> get_some(cake)
80 Function arguments and invocation
82 Sux introspects the function arguments. For each argument, Sux will look for an "adaptor"
83 function. An adaptor function takes an http request as its argument and extracts a value
84 from the request. Thus the handler is decoupled from http.
86 Example
87 1 http request GET /this/23/that?user_name=bob
89 2 lookup matching handler get_this_that(this_id, user_name)
91 3 extract argument names this_id, user_name
93 4 find corresponding adaptors functions this_id(request), user_name(request)
95 5 call adaptors and collect return vals [23, 'bob']
97 6 invoke handler get_this_that(23, user_name='bob'))
100 @param handler_module - string module name or actual module containing
101 handler (end point) functions
103 @param adaptor_module - string module name or actual module containing adaptor
104 functions
106 @params models_module - string name of module providing Pydantic schema models
108 @param arg_regex - reqular expression (string) defining which url path name_elements
109 should be treated as arguments. By default any numerical name_elements
110 are captured by the regex r'\\d+'. So /a/23/b/ would yield 23
112 @param mount_depth - Sux inspects the path_info element of the URL to resolve the matching
113 handler method. If sux is mounted at a given path, e.g. '/url/' then
114 this path element is ignored. The mount_depth argument indicates how many
115 path arguments should be chopped. Default is 1,
116 e.g. GET '/api/things/23' > get_api(things_id)
117 @param api_name - The name of this API for documentation purposes
119 @param validate - boolean to determine whether Sux should check that the handler module
120 contains at least one valid handler. Set to False for easier unit testing.
122 """
124 def __init__(
125 self,
126 handler_module,
127 adaptor_module=None,
128 models_module="rfpy.web.serial",
129 arg_regex=None,
130 mount_depth=1,
131 api_name="PostRFP API",
132 validate=True,
133 ):
134 if arg_regex is None:
135 arg_regex = ARG_REGEX
136 self.models_module = models_module
137 self.arg_regex = re.compile(arg_regex)
138 self.mount_depth = mount_depth
139 self.api_name = api_name
140 self.handler_module = None
141 self.adaptor_module = None
142 self.modules = {} # set of modules in package containing valid handlers
144 self._spec_dict = None
146 self._handler_cache = {} # cache of all func names to handlers
147 self._handler_argspec_cache = {} # cache results of inspecting functions
149 self.load_handler_module(handler_module)
150 self.load_adaptors(adaptor_module)
151 self.load_handler_cache()
153 if validate and not self.is_handler_module_valid():
154 raise RoutingError(
155 "handler module does not contain any valid handler functions"
156 )
158 def is_handler_module_valid(self):
159 """Check that the handler_module contains at least one valid handler"""
160 return len(self._handler_cache) > 0
162 def load_handler_module(self, handler_module):
163 if inspect.ismodule(handler_module):
164 self.handler_module = handler_module
165 elif isinstance(handler_module, str):
166 self.handler_module = importlib.import_module(handler_module)
167 else:
168 raise RoutingError("handler_module must be a module or string module name")
170 def load_handler_cache(self):
171 for handler in self.iter_handlers():
172 self._handler_cache[handler.name] = handler.func
174 def load_adaptors(self, adaptor_module):
175 if adaptor_module is None:
176 self.adaptor_module = self.handler_module
177 elif inspect.ismodule(adaptor_module):
178 self.adaptor_module = adaptor_module
179 elif isinstance(adaptor_module, str):
180 self.adaptor_module = importlib.import_module(adaptor_module)
181 else:
182 raise RoutingError("adaptor_module must be a module or string module name")
184 def extract_handler_name(self, request):
185 """
186 Converts an http request path into a method name
188 - Replaces '/' with '_' characters
189 - Removes anything that looks like a path argument (according to arg_regex)
191 e.g /get/project/1/people/ => get_project_people(project_id)
192 """
193 method = request.method.lower()
194 # strip digits and preceding slashes from the name
195 bare_path, num_path_args = self.arg_regex.subn("", request.path_info)
196 handler_suffix = bare_path.strip("/").replace("/", "_")
198 if not handler_suffix:
199 # assume a call to the root '/', i.e. GET / maps to def get():
200 return (method, num_path_args)
202 derived_func_name = "%s_%s" % (method, handler_suffix)
204 return (derived_func_name, num_path_args)
206 def no_matching_handler(self, handler_name, request):
207 if request.path_info_peek() == "swagger.json":
208 if self._spec_dict is None:
209 path = request.script_name.rstrip("/") + "/"
210 spec = create_spec(self, path=path)
211 self._spec_dict = spec.to_dict()
212 return self._spec_dict
214 raise RoutingError(
215 'Handler "%s" not found in module "%s"'
216 % (handler_name, self.handler_module.__name__)
217 )
219 def args_for_handler(self, handler_func):
220 """
221 Extract the names of the function arguments parameters declared by
222 the given callable
223 """
224 cache = self._handler_argspec_cache
226 if handler_func not in cache:
227 sig = inspect.signature(handler_func)
228 # skip the first handler_func argument if instance method(allow for self)
229 skip = 0 if inspect.isfunction(handler_func) else 1
230 arg_names = [
231 p.name
232 for p in sig.parameters.values()
233 if p.kind is Parameter.POSITIONAL_OR_KEYWORD
234 ][skip:]
235 cache[handler_func] = arg_names
237 return cache[handler_func]
239 def __call__(self, original_request):
240 request = original_request.copy()
241 for _ in range(self.mount_depth):
242 request.path_info_pop()
244 handler_name, num_path_args = self.extract_handler_name(request)
246 try:
247 handler = self._handler_cache[handler_name]
248 except KeyError:
249 return self.no_matching_handler(handler_name, request)
251 if not getattr(handler, "http_exposed", False):
252 raise RoutingError("Handler %s is not http accessible" % handler)
253 if getattr(handler, "etag", False):
254 original_request.generate_etag = True
256 params = []
258 for handler_arg in self.args_for_handler(handler):
259 try:
260 adaptor = getattr(self.adaptor_module, handler_arg)
261 except AttributeError:
262 raise AttributeError(
263 'Adaptor "%s" not found in module "%s"'
264 % (handler_arg, self.adaptor_module.__name__)
265 )
266 if num_path_args > 0:
267 if isinstance(adaptor, PathArg):
268 num_path_args -= 1
269 elif num_path_args == 0 and isinstance(adaptor, PathArg):
270 msg = (
271 f'Path argument "{adaptor.arg_name}" not found'
272 f" for function {handler.__name__}"
273 )
274 log.error(msg)
275 raise RoutingError(msg)
277 command = adaptor(request)
278 params.append(command)
280 if num_path_args > 0:
281 # check that get_example_path() doesn't answer to GET /example/path/1/
282 # or GET /example/2/path
283 raise RoutingError(
284 f'Handler function for Request "{request.path}" '
285 "must consume all path arguments"
286 )
288 return handler(*params)
290 def iter_handler_modules(self):
291 yield ("__root__", self.handler_module)
292 for mod_name, mod in inspect.getmembers(self.handler_module, inspect.ismodule):
293 yield (mod_name, mod)
295 def iter_handlers(self) -> Iterable["Handler"]:
296 for mod_name, mod in self.iter_handler_modules():
297 for func_name, func in inspect.getmembers(mod, inspect.isfunction):
298 if HANDLER_PREFIX.match(func_name) and getattr(
299 func, "http_exposed", False
300 ):
301 if mod_name != "__root__" and mod_name not in self.modules:
302 self.modules[mod_name] = mod
303 tag = mod_name.capitalize() if mod_name != "__root__" else None
304 yield Handler(func_name, func, self.adaptor_module, tag=tag)
306 def handler_by_name(self, handler_name):
307 for handler in self.iter_handlers():
308 if handler.name == handler_name:
309 return handler
312class Handler:
313 """
314 Provides a wrapper around a real api endpoint (function or method)
316 Used primarily for generating documentation
317 """
319 def __init__(
320 self, name: str, func: Callable, adaptor_mod, tag: Optional[str] = None
321 ):
322 self.name = name
323 self.func = func
324 self.adaptor_mod = adaptor_mod
325 self.name_elements = name.split("_")
326 self._signature = None
327 self._retval = None
328 self._docs = DocString(func)
329 self.tag = tag
331 @property
332 def method(self):
333 """HTTP method, e.g. 'GET' or 'POST"""
334 return self.name_elements[0].upper()
336 @property
337 def path(self):
338 """
339 Show the documentation path for this Handler
341 This effectively reverses the process that Sux follows - it constructs the
342 'real' HTTP path by interleave the function name and the arguments to build
343 a URL. so get_this_that(this_id) becomes /this/{this_id}/that/
345 adaptor.name is used to match incoming http request path elements
346 adaptor_map will look like {
347 'project': project_id() - an ArgExtractor function which extracts
348 the project id from the path
349 'issue': issue_id()
350 }
351 """
352 adaptor_map = {
353 adaptor.arg_name: adaptor for arg_name, adaptor in self.path_params()
354 }
356 def lookup(match):
357 path_part = match.groups()[0]
358 if path_part in adaptor_map:
359 adaptor = adaptor_map[path_part]
360 # Interleave the parameter name into the function name derived path
361 return "/%s/{%s}" % (path_part, adaptor.doc_name)
362 return "/" + path_part
364 unparameterised_path = "/" + "/".join(self.name_elements[1:])
365 return re.sub(r"/(\w+)", lookup, unparameterised_path)
367 def path_params(self):
368 """Generator which yields all PathArg arguments for this handler"""
369 for arg_name in self.required_arguments:
370 adaptor = getattr(self.adaptor_mod, arg_name, None)
371 if adaptor is not None and isinstance(adaptor, PathArg):
372 yield arg_name, adaptor
374 def adaptors(self):
375 for arg_name in chain(self.required_arguments, self.optional_arguments):
376 adaptor = getattr(self.adaptor_mod, arg_name, None)
377 yield adaptor
379 @property
380 def parameters(self):
381 """Returns a list in inspect.Parameter objects for this handler's function"""
382 if getattr(self, "_signature", None) is None:
383 self._signature = inspect.signature(self.func)
384 return self._signature.parameters.values()
386 @property
387 def required_arguments(self):
388 """The list of non kwarg argument names for the wrapped func"""
389 return [p.name for p in self.parameters if p.default is Signature.empty]
391 @property
392 def optional_arguments(self):
393 """A list of optional (keyword) arguments for the wrapped function"""
394 return [p.name for p in self.parameters if p.default is not Signature.empty]
396 @property
397 def docs(self):
398 info = self._docs.intro
399 perms = self.permissions
400 if perms:
401 info += "\n\nPermissions: " + perms
402 errs = self.errors
403 if errs:
404 info += "\n\nErrors: " + errs
405 return info
407 @property
408 def permissions(self):
409 if not self._docs.permissions:
410 return None
411 return ", ".join(self._docs.permissions)
413 @property
414 def errors(self):
415 if not self._docs.permissions:
416 return None
417 return ", ".join(self._docs.errors)
419 @property
420 def return_annotation(self):
421 from typing import get_args
423 retval = self.return_value
424 try:
425 return get_args(retval)[1]
426 except IndexError:
427 return None
429 @property
430 def return_value(self):
431 if self._retval is None:
432 self._retval = inspect.get_annotations(self.func).get("return", None)
433 return self._retval
435 def __repr__(self):
436 return (
437 f"<suxint.Handler '{self.name}' Method: {self.method}, Path: {self.path}>"
438 )
441class DocString:
442 """Extracts information from a function's docstring using @thing notation"""
444 def __init__(self, func):
445 if func.__doc__ is not None:
446 self.docstring = textwrap.dedent(func.__doc__).strip()
447 else:
448 self.docstring = ""
449 self.param_pairs = re.findall(r"@(\w+)\s+(.*)$", self.docstring, re.MULTILINE)
451 def _key(self, key):
452 return [v for k, v in self.param_pairs if k == key]
454 @property
455 def intro(self):
456 intro, _at, _after_at = self.docstring.partition("@")
457 return intro.strip()
459 @property
460 def permissions(self):
461 return self._key("permissions")
463 @property
464 def errors(self):
465 return self._key("raises")
468truthy_strings = {"true", "1", "yes"}
471class ArgExtractor(object):
472 """
473 Base class for Argument Extractor classes
475 Instances of ArgExtractor are callables that extract a value
476 from an HTTP request
478 e.g. GetArg('name') produces a callable that knows how to extract the
479 GET parameter 'name' from an http request
480 """
482 swagger_in: Optional[str] = None
483 swagger_required = True
485 def __init__(
486 self,
487 arg_name,
488 validator=None,
489 doc=None,
490 default=None,
491 arg_type="int",
492 converter=None,
493 enum_values=None,
494 required=None,
495 ) -> None:
496 self.arg_name = arg_name
497 self.doc = doc
498 self.default = default
499 self.arg_type = arg_type
500 self.converter = converter
501 self.required = required
502 if enum_values is not None and not isinstance(enum_values, (set, list, tuple)):
503 raise ValueError("enum_values must be a set, list or tuple")
504 self.enum_values = enum_values
505 if callable(validator):
506 self.validator = validator
507 elif validator is not None:
508 raise ValueError("validator, if given, must be a callable")
509 # subclasses might define a validator, don't overwrite it
510 if not hasattr(self, "validator"):
511 self.validator = None
513 def validate(self, value):
514 if self.validator:
515 if not self.validator(value):
516 raise ValueError("%s is not a valid value" % value)
518 def validate_enums(self, value):
519 if self.enum_values is not None:
520 if isinstance(value, (list, set, tuple)):
521 for v in value:
522 if v not in self.enum_values:
523 raise ValueError(
524 f"Value -{v}- is not one of {self.enum_values}"
525 )
526 elif value not in self.enum_values:
527 raise ValueError(f"Value -{value}- is not one of {self.enum_values}")
529 def typed(self, value):
530 if value is None:
531 return None
532 if not isinstance(value, str):
533 value = str(value)
534 value = value.strip()
535 if self.arg_type == "int":
536 return int(value)
537 if self.arg_type == "float":
538 return float(value)
539 if self.arg_type == "boolean":
540 return value.lower() in truthy_strings
542 # lstrip is a workaround to enable strings as path arguments
543 # to be recognised, e.g. /user/:bob/delete
544 return value.lstrip(":")
546 def __call__(self, request):
547 val = self.extract(request)
548 self.validate(val)
549 if self.converter is not None:
550 return self.converter(val)
551 return val
553 def extract(self, request):
554 raise NotImplementedError
556 def update_openapi_path_object(self, path_object: dict):
557 spec = {
558 "in": self.swagger_in,
559 "required": self.swagger_required,
560 "name": self.doc_name,
561 }
562 if self.required is not None:
563 spec["required"] = self.required
565 if self.doc is not None:
566 spec["description"] = self.doc
567 spec["schema"] = dict(type=self.swagger_type)
568 if self.default is not None:
569 val = self.default
570 # if self.arg_type == 'boolean':
571 # if isinstance(val, str):
572 # val = val.lower() in truthy_strings
573 # else:
574 # val = bool(val)
575 spec["schema"]["default"] = val
577 if getattr(self, "enum_values", None) is not None:
578 if "items" in spec["schema"]:
579 spec["schema"]["items"]["enum"] = self.enum_values
580 else:
581 spec["schema"]["enum"] = self.enum_values
583 self.augment_openapi_path_object(spec)
585 path_object["parameters"].append(spec)
587 def augment_openapi_path_object(self, spec: dict) -> None:
588 """
589 Subclasses can override this method to add additional information to the
590 OpenAPI path object.
591 """
592 pass
594 @property
595 def swagger_type(self):
596 return openapi_type_mapping[self.arg_type]
598 @property
599 def doc_name(self):
600 """The name of this Parameter for documentation purposes"""
601 return self.arg_name
604class PathArg(ArgExtractor):
605 swagger_in = "path"
607 def __init__(self, *args, **kwargs):
608 super(PathArg, self).__init__(*args, **kwargs)
609 regexp = r".*/%s/([^/]+)/*" % self.arg_name
610 self.path_regex = re.compile(regexp)
612 def extract(self, request):
613 try:
614 val = self.path_regex.match(request.path_info).groups()[0]
615 return self.typed(val)
616 except Exception:
617 mess = (
618 f"{self.__class__.__name__} Adaptor could not extract "
619 f'value "{self.arg_name}" from path {request.path_info} '
620 f"using regex {self.path_regex.pattern}"
621 )
622 raise ValueError(mess)
624 @property
625 def doc_name(self):
626 """The name of this Parameter for documentation purposes"""
627 return self.arg_name + "_id"
630class GetArg(ArgExtractor):
631 swagger_in = "query"
632 swagger_required = False
634 def extract(self, request):
635 if self.arg_name not in request.GET:
636 if self.required:
637 raise ValueError(f"Query param '{self.arg_name}'' must be provided")
638 return self.typed(self.default)
639 value = request.GET[self.arg_name]
640 if value.strip() == "":
641 return self.typed(self.default)
642 else:
643 self.validate_enums(value)
644 return self.typed(value)
647class GetArgSet(ArgExtractor):
648 """Extracts a Set (sequence) of argument values for the given GET arg"""
650 swagger_in = "query"
651 swagger_type = "array"
653 def __init__(self, *args, **kwargs):
654 self.array_items_type = kwargs.pop("array_items_type", "str")
655 self.min_items = kwargs.pop("min_items", None)
656 self.max_items = kwargs.pop("max_items", None)
657 super().__init__(*args, **kwargs)
659 def extract(self, request):
660 arg_array_name = f"{self.arg_name}[]"
661 if self.arg_name in request.GET:
662 arg_array_name = self.arg_name
663 values = request.GET.getall(arg_array_name)
664 if self.min_items is not None and len(values) < self.min_items:
665 raise ValueError(
666 f"At least {self.min_items} {self.arg_name} parameters required"
667 )
668 if self.max_items is not None and len(values) > self.max_items:
669 raise ValueError(
670 f"No more than {self.max_items} {self.arg_name} parameters permitted"
671 )
672 self.validate_enums(values)
673 return {self.typed(v) for v in values}
675 def augment_openapi_path_object(self, spec: dict) -> None:
676 if self.min_items is not None:
677 spec["schema"]["minItems"] = self.min_items
679 if self.max_items is not None:
680 spec["schema"]["maxItems"] = self.max_items
682 if self.array_items_type is not None:
683 spec["schema"] = {
684 "type": "array",
685 "items": {"type": openapi_type_mapping[self.array_items_type]},
686 }
689def _get_or_create_schema(
690 path_object: dict, mime_type="application/x-www-form-urlencoded"
691):
692 return (
693 path_object.setdefault("requestBody", {})
694 .setdefault("content", {})
695 .setdefault(mime_type, {})
696 .setdefault("schema", {})
697 )
700class PostArg(ArgExtractor):
701 swagger_in = "formData"
702 swagger_required = False
703 swagger_type = "string"
704 doc_name = "body"
706 def extract(self, request):
707 value = request.POST.get(self.arg_name, self.default)
708 if value is not None:
709 tv = self.typed(value)
710 if self.enum_values and tv not in self.enum_values:
711 raise ValueError(f"{tv} is not in {self.enum_values}")
712 return tv
713 return None
715 def update_openapi_path_object(self, path_object: dict):
716 schema = _get_or_create_schema(path_object, mime_type="multipart/form-data")
717 schema.setdefault("type", "object")
718 properties = schema.setdefault("properties", {})
719 properties[self.arg_name] = {"type": self.swagger_type}
720 if self.enum_values is not None:
721 properties[self.arg_name]["enum"] = list(self.enum_values)
724class PostFileArg(ArgExtractor):
725 swagger_in = "formData"
726 swagger_type = "string"
727 doc_name = "body"
729 def __init__(self, *args, **kwargs):
730 kwargs["arg_type"] = "file"
731 super().__init__(*args, **kwargs)
733 def extract(self, request):
734 val = request.POST.get(self.arg_name, None)
735 if val is None and self.required:
736 raise ValueError(f"Post Parameter {self.arg_name} must be provided")
737 return val
739 def update_openapi_path_object(self, path_object: dict):
740 schema = _get_or_create_schema(path_object, mime_type="multipart/form-data")
741 schema.setdefault("type", "object")
742 properties = schema.setdefault("properties", {})
743 properties[self.arg_name] = {"type": "string", "format": "binary"}
746def downgrade_prop(prop):
747 if "const" in prop:
748 prop["enum"] = [prop["const"]]
749 del prop["const"]
750 for exclusive, plainProp in [
751 ("exclusiveMinimum", "minimum"),
752 ("exclusiveMaximum", "maximum"),
753 ]:
754 if exclusive in prop:
755 exmin = prop[exclusive]
756 prop[exclusive] = True
757 prop[plainProp] = exmin
760def _downgrade_json_schema(model_spec):
761 """
762 OpenAPI v3 uses an old (0.0.) version of JSON Schema. Pydantic provides a more
763 recent version. This functions makes the pydantic output compatible with OpenAPI
765 "title" is used for models and model properties in the pydantic output but opeanAPI doesn't
766 need it, and it adds verbosity, so deleting it.
767 """
768 model_spec.pop("title", None)
769 if "properties" not in model_spec:
770 return
771 for prop in model_spec["properties"].values():
772 prop.pop("title", None)
774 downgrade_prop(prop)
775 if "items" in prop:
776 downgrade_prop(prop["items"])
779def servers_object(path):
780 return [
781 {"url": f"https://www.rfpalchemy.com/{path}"},
782 {"url": f"http://localhost:9000/{path}"},
783 ]
786def add_model_components(spec, sux_instance: Sux):
787 if isinstance(sux_instance.models_module, str):
788 models_mod = importlib.import_module(sux_instance.models_module)
789 else:
790 raise ValueError(
791 "sux_instance models_module attribute must be a string module name"
792 )
794 mods: Sequence = [
795 (mod, "validation")
796 for mod in (getattr(models_mod, m) for m in dir(models_mod))
797 if (
798 isinstance(mod, type)
799 and issubclass(mod, BaseModel)
800 and mod is not BaseModel
801 )
802 ]
804 try:
805 # This call to update_forward_refs is required because Nodes has recursive properties
806 # - horrible hack because I cannot figure out how to determine if a model has been
807 # created with call to typing.ForwardRef. See pydantic docs
808 # https://pydantic-docs.helpmanual.io/usage/postponed_annotations/
809 getattr(models_mod, "Nodes").model_rebuild()
810 except AttributeError:
811 pass
813 _, fulldefs = models_json_schema(mods, ref_template="#/components/schemas/")
815 defs = fulldefs["$defs"]
816 for model_name in defs.keys():
817 model_spec = defs[model_name]
818 _downgrade_json_schema(model_spec)
819 spec.components.schema(model_name, model_spec)
822def create_spec(sux_instance: Sux, path="api/") -> APISpec:
823 from .web.base import API_VERSION
825 spec = APISpec(
826 title=sux_instance.api_name,
827 info=dict(description="API for managing evaluation projects"),
828 version=str(API_VERSION),
829 openapi_version="3.0.3",
830 plugins=[SuxPlugin()],
831 servers=servers_object(path),
832 )
834 path_map = defaultdict(list)
836 for handler in sux_instance.iter_handlers():
837 path_map[handler.path].append(handler)
839 for url_key in sorted(path_map.keys()):
840 handlers = path_map[url_key]
841 sux_item = (url_key, handlers)
842 spec.path(path=url_key, sux_handler=sux_item)
844 add_model_components(spec, sux_instance)
846 for mod_name, mod in sux_instance.modules.items():
847 tag_dict = {"name": mod_name.capitalize()}
848 if mod.__doc__:
849 tag_dict["description"] = textwrap.dedent(mod.__doc__).strip()
851 spec.tag(tag_dict)
853 return spec