""" Jira Integration for MCP Provides Jira API tools for issue tracking and project management. """ from typing import Dict, Any, List, Optional from atlassian import Jira from mcp import types from .base import BaseIntegration # Jira MCP Tool Definitions JIRA_TOOLS = [ types.Tool( name="jira_create_issue", description="Create a new Jira issue", inputSchema={ "type": "object", "properties": { "project_key": { "type": "string", "description": "Jira project key (e.g., 'DSS')" }, "summary": { "type": "string", "description": "Issue summary/title" }, "description": { "type": "string", "description": "Issue description" }, "issue_type": { "type": "string", "description": "Issue type (Story, Task, Bug, etc.)", "default": "Task" } }, "required": ["project_key", "summary"] } ), types.Tool( name="jira_get_issue", description="Get Jira issue details by key", inputSchema={ "type": "object", "properties": { "issue_key": { "type": "string", "description": "Issue key (e.g., 'DSS-123')" } }, "required": ["issue_key"] } ), types.Tool( name="jira_search_issues", description="Search Jira issues using JQL", inputSchema={ "type": "object", "properties": { "jql": { "type": "string", "description": "JQL query (e.g., 'project=DSS AND status=Open')" }, "max_results": { "type": "integer", "description": "Maximum number of results", "default": 50 } }, "required": ["jql"] } ), types.Tool( name="jira_update_issue", description="Update a Jira issue", inputSchema={ "type": "object", "properties": { "issue_key": { "type": "string", "description": "Issue key to update" }, "fields": { "type": "object", "description": "Fields to update (summary, description, status, etc.)" } }, "required": ["issue_key", "fields"] } ), types.Tool( name="jira_add_comment", description="Add a comment to a Jira issue", inputSchema={ "type": "object", "properties": { "issue_key": { "type": "string", "description": "Issue key" }, "comment": { "type": "string", "description": "Comment text" } }, "required": ["issue_key", "comment"] } ) ] class JiraIntegration(BaseIntegration): """Jira API integration with circuit breaker""" def __init__(self, config: Dict[str, Any]): """ Initialize Jira integration. Args: config: Must contain 'url', 'username', 'api_token' """ super().__init__("jira", config) url = config.get("url") username = config.get("username") api_token = config.get("api_token") if not all([url, username, api_token]): raise ValueError("Jira configuration incomplete: url, username, api_token required") self.jira = Jira( url=url, username=username, password=api_token, cloud=True ) async def create_issue( self, project_key: str, summary: str, description: str = "", issue_type: str = "Task" ) -> Dict[str, Any]: """Create a new Jira issue""" def _create(): fields = { "project": {"key": project_key}, "summary": summary, "description": description, "issuetype": {"name": issue_type} } return self.jira.create_issue(fields) return await self.call_api(_create) async def get_issue(self, issue_key: str) -> Dict[str, Any]: """Get issue details""" def _get(): return self.jira.get_issue(issue_key) return await self.call_api(_get) async def search_issues(self, jql: str, max_results: int = 50) -> Dict[str, Any]: """Search issues with JQL""" def _search(): return self.jira.jql(jql, limit=max_results) return await self.call_api(_search) async def update_issue(self, issue_key: str, fields: Dict[str, Any]) -> Dict[str, Any]: """Update issue fields""" def _update(): self.jira.update_issue_field(issue_key, fields) return {"status": "updated", "issue_key": issue_key} return await self.call_api(_update) async def add_comment(self, issue_key: str, comment: str) -> Dict[str, Any]: """Add comment to issue""" def _comment(): return self.jira.issue_add_comment(issue_key, comment) return await self.call_api(_comment) class JiraTools: """MCP tool executor for Jira integration""" def __init__(self, config: Dict[str, Any]): self.jira = JiraIntegration(config) async def execute_tool(self, tool_name: str, arguments: Dict[str, Any]) -> Dict[str, Any]: """Execute Jira tool""" handlers = { "jira_create_issue": self.jira.create_issue, "jira_get_issue": self.jira.get_issue, "jira_search_issues": self.jira.search_issues, "jira_update_issue": self.jira.update_issue, "jira_add_comment": self.jira.add_comment } handler = handlers.get(tool_name) if not handler: return {"error": f"Unknown Jira tool: {tool_name}"} try: clean_args = {k: v for k, v in arguments.items() if not k.startswith("_")} result = await handler(**clean_args) return result except Exception as e: return {"error": str(e)}