feat: add search and get schema definition tools

This commit is contained in:
2025-10-15 11:12:02 -03:00
parent 45241f9853
commit 2a80c3e056

174
main.py
View File

@@ -121,6 +121,28 @@ class OpenAPIServer:
description="List all available API endpoints", description="List all available API endpoints",
inputSchema={"type": "object", "properties": {}}, inputSchema={"type": "object", "properties": {}},
), ),
types.Tool(
name="search_schemas",
description="Search schema definitions by name",
inputSchema={
"type": "object",
"properties": {
"query": {"type": "string", "description": "Search term to match against schema names"}
},
"required": ["query"],
},
),
types.Tool(
name="get_schema",
description="Get a specific schema definition with all $ref references resolved",
inputSchema={
"type": "object",
"properties": {
"schema_name": {"type": "string", "description": "Name of the schema to retrieve"}
},
"required": ["schema_name"],
},
),
] ]
@self.server.call_tool() @self.server.call_tool()
@@ -155,6 +177,23 @@ class OpenAPIServer:
types.TextContent(type="text", text=json.dumps(results, indent=2)) types.TextContent(type="text", text=json.dumps(results, indent=2))
] ]
elif name == "search_schemas":
query = arguments.get("query", "") if arguments else ""
self.logger.debug(f"Searching schemas with query: {query}")
results = self.search_schemas(query)
self.logger.info(f"Found {len(results)} matching schemas")
return [
types.TextContent(type="text", text=json.dumps(results, indent=2))
]
elif name == "get_schema":
schema_name = arguments.get("schema_name", "") if arguments else ""
self.logger.debug(f"Getting schema details for: {schema_name}")
result = self.get_schema_details(schema_name)
return [
types.TextContent(type="text", text=json.dumps(result, indent=2))
]
else: else:
self.logger.error(f"Unknown tool requested: {name}") self.logger.error(f"Unknown tool requested: {name}")
raise ValueError(f"Unknown tool: {name}") raise ValueError(f"Unknown tool: {name}")
@@ -220,6 +259,141 @@ class OpenAPIServer:
) )
return results return results
def resolve_schema_ref(self, ref: str, visited: set = None) -> Dict:
"""Resolve a $ref reference to its actual schema definition
Args:
ref: The reference string (e.g., "#/components/schemas/MySchema" or "#/definitions/MySchema")
visited: Set of already visited references to prevent circular dependencies
Returns:
The resolved schema definition with all nested $ref resolved
"""
if visited is None:
visited = set()
if ref in visited:
return {"error": f"Circular reference detected: {ref}"}
visited.add(ref)
if not self.spec:
return {"error": "No spec loaded"}
# Parse the reference path
# Supports both OpenAPI 3.0 (#/components/schemas/...) and Swagger 2.0 (#/definitions/...)
if not ref.startswith("#/"):
return {"error": f"Invalid reference format: {ref}"}
ref_path = ref[2:].split("/") # Remove "#/" and split
# Navigate through the spec to find the referenced schema
current = self.spec
for part in ref_path:
if isinstance(current, dict) and part in current:
current = current[part]
else:
return {"error": f"Reference not found: {ref}"}
# Deep copy to avoid modifying the original spec
import copy
schema = copy.deepcopy(current)
# Recursively resolve any nested $ref in the schema
schema = self._resolve_nested_refs(schema, visited)
return schema
def _resolve_nested_refs(self, obj, visited: set):
"""Recursively resolve all $ref in a schema object"""
if isinstance(obj, dict):
if "$ref" in obj:
ref = obj["$ref"]
resolved = self.resolve_schema_ref(ref, visited.copy())
# Merge any additional properties that might exist alongside $ref
for key, value in obj.items():
if key != "$ref" and key not in resolved:
resolved[key] = value
return resolved
else:
return {key: self._resolve_nested_refs(value, visited) for key, value in obj.items()}
elif isinstance(obj, list):
return [self._resolve_nested_refs(item, visited) for item in obj]
else:
return obj
def search_schemas(self, query: str) -> List[Dict]:
"""Search schema definitions by name or description
Args:
query: Search term to match against schema names
Returns:
List of matching schema names and their descriptions
"""
results = []
if not self.spec:
return results
# Support both OpenAPI 3.0 (components/schemas) and Swagger 2.0 (definitions)
schemas = None
if "components" in self.spec and "schemas" in self.spec["components"]:
schemas = self.spec["components"]["schemas"]
prefix = "#/components/schemas/"
elif "definitions" in self.spec:
schemas = self.spec["definitions"]
prefix = "#/definitions/"
else:
return results
query_lower = query.lower()
for schema_name, schema_def in schemas.items():
if query_lower in schema_name.lower():
description = ""
if isinstance(schema_def, dict):
description = schema_def.get("description", schema_def.get("title", ""))
results.append({
"name": schema_name,
"ref": f"{prefix}{schema_name}",
"description": description,
"type": schema_def.get("type", "object") if isinstance(schema_def, dict) else "unknown"
})
return results
def get_schema_details(self, schema_name: str) -> Dict:
"""Get full details for a specific schema with all references resolved
Args:
schema_name: Name of the schema (e.g., "PersonaSimplificada")
Returns:
The schema definition with all $ref resolved
"""
if not self.spec:
return {"error": "No spec loaded"}
# Support both OpenAPI 3.0 and Swagger 2.0
ref = None
if "components" in self.spec and "schemas" in self.spec["components"]:
if schema_name in self.spec["components"]["schemas"]:
ref = f"#/components/schemas/{schema_name}"
elif "definitions" in self.spec:
if schema_name in self.spec["definitions"]:
ref = f"#/definitions/{schema_name}"
if not ref:
return {"error": f"Schema '{schema_name}' not found"}
resolved = self.resolve_schema_ref(ref)
return {
"name": schema_name,
"ref": ref,
"schema": resolved
}
async def run(self): async def run(self):
async with mcp.server.stdio.stdio_server() as (read_stream, write_stream): async with mcp.server.stdio.stdio_server() as (read_stream, write_stream):
await self.server.run( await self.server.run(