diff --git a/main.py b/main.py index 4476e03..45d21ab 100755 --- a/main.py +++ b/main.py @@ -121,6 +121,28 @@ class OpenAPIServer: description="List all available API endpoints", inputSchema={"type": "object", "properties": {}}, ), + types.Tool( + name="search_schemas", + description="Search schema definitions by name", + inputSchema={ + "type": "object", + "properties": { + "query": {"type": "string", "description": "Search term to match against schema names"} + }, + "required": ["query"], + }, + ), + types.Tool( + name="get_schema", + description="Get a specific schema definition with all $ref references resolved", + inputSchema={ + "type": "object", + "properties": { + "schema_name": {"type": "string", "description": "Name of the schema to retrieve"} + }, + "required": ["schema_name"], + }, + ), ] @self.server.call_tool() @@ -155,6 +177,23 @@ class OpenAPIServer: types.TextContent(type="text", text=json.dumps(results, indent=2)) ] + elif name == "search_schemas": + query = arguments.get("query", "") if arguments else "" + self.logger.debug(f"Searching schemas with query: {query}") + results = self.search_schemas(query) + self.logger.info(f"Found {len(results)} matching schemas") + return [ + types.TextContent(type="text", text=json.dumps(results, indent=2)) + ] + + elif name == "get_schema": + schema_name = arguments.get("schema_name", "") if arguments else "" + self.logger.debug(f"Getting schema details for: {schema_name}") + result = self.get_schema_details(schema_name) + return [ + types.TextContent(type="text", text=json.dumps(result, indent=2)) + ] + else: self.logger.error(f"Unknown tool requested: {name}") raise ValueError(f"Unknown tool: {name}") @@ -220,6 +259,141 @@ class OpenAPIServer: ) return results + def resolve_schema_ref(self, ref: str, visited: set = None) -> Dict: + """Resolve a $ref reference to its actual schema definition + + Args: + ref: The reference string (e.g., "#/components/schemas/MySchema" or "#/definitions/MySchema") + visited: Set of already visited references to prevent circular dependencies + + Returns: + The resolved schema definition with all nested $ref resolved + """ + if visited is None: + visited = set() + + if ref in visited: + return {"error": f"Circular reference detected: {ref}"} + + visited.add(ref) + + if not self.spec: + return {"error": "No spec loaded"} + + # Parse the reference path + # Supports both OpenAPI 3.0 (#/components/schemas/...) and Swagger 2.0 (#/definitions/...) + if not ref.startswith("#/"): + return {"error": f"Invalid reference format: {ref}"} + + ref_path = ref[2:].split("/") # Remove "#/" and split + + # Navigate through the spec to find the referenced schema + current = self.spec + for part in ref_path: + if isinstance(current, dict) and part in current: + current = current[part] + else: + return {"error": f"Reference not found: {ref}"} + + # Deep copy to avoid modifying the original spec + import copy + schema = copy.deepcopy(current) + + # Recursively resolve any nested $ref in the schema + schema = self._resolve_nested_refs(schema, visited) + + return schema + + def _resolve_nested_refs(self, obj, visited: set): + """Recursively resolve all $ref in a schema object""" + if isinstance(obj, dict): + if "$ref" in obj: + ref = obj["$ref"] + resolved = self.resolve_schema_ref(ref, visited.copy()) + # Merge any additional properties that might exist alongside $ref + for key, value in obj.items(): + if key != "$ref" and key not in resolved: + resolved[key] = value + return resolved + else: + return {key: self._resolve_nested_refs(value, visited) for key, value in obj.items()} + elif isinstance(obj, list): + return [self._resolve_nested_refs(item, visited) for item in obj] + else: + return obj + + def search_schemas(self, query: str) -> List[Dict]: + """Search schema definitions by name or description + + Args: + query: Search term to match against schema names + + Returns: + List of matching schema names and their descriptions + """ + results = [] + if not self.spec: + return results + + # Support both OpenAPI 3.0 (components/schemas) and Swagger 2.0 (definitions) + schemas = None + if "components" in self.spec and "schemas" in self.spec["components"]: + schemas = self.spec["components"]["schemas"] + prefix = "#/components/schemas/" + elif "definitions" in self.spec: + schemas = self.spec["definitions"] + prefix = "#/definitions/" + else: + return results + + query_lower = query.lower() + for schema_name, schema_def in schemas.items(): + if query_lower in schema_name.lower(): + description = "" + if isinstance(schema_def, dict): + description = schema_def.get("description", schema_def.get("title", "")) + + results.append({ + "name": schema_name, + "ref": f"{prefix}{schema_name}", + "description": description, + "type": schema_def.get("type", "object") if isinstance(schema_def, dict) else "unknown" + }) + + return results + + def get_schema_details(self, schema_name: str) -> Dict: + """Get full details for a specific schema with all references resolved + + Args: + schema_name: Name of the schema (e.g., "PersonaSimplificada") + + Returns: + The schema definition with all $ref resolved + """ + if not self.spec: + return {"error": "No spec loaded"} + + # Support both OpenAPI 3.0 and Swagger 2.0 + ref = None + if "components" in self.spec and "schemas" in self.spec["components"]: + if schema_name in self.spec["components"]["schemas"]: + ref = f"#/components/schemas/{schema_name}" + elif "definitions" in self.spec: + if schema_name in self.spec["definitions"]: + ref = f"#/definitions/{schema_name}" + + if not ref: + return {"error": f"Schema '{schema_name}' not found"} + + resolved = self.resolve_schema_ref(ref) + + return { + "name": schema_name, + "ref": ref, + "schema": resolved + } + async def run(self): async with mcp.server.stdio.stdio_server() as (read_stream, write_stream): await self.server.run(