Jira AI Agentic workflow: From Customer Emails to Automatic Jira Tickets

·

·

Improving customer support efficiency is a key challenge for every successful business. Processing and tracking error reports is often a time-consuming process that can divert valuable resources from other important tasks. In this tutorial, we will demonstrate how to create an AI agent that automatically monitors Gmail accounts, detects customer error reports, analyzes their content, and then creates the appropriate Jira tickets.

This system automatically processes incoming emails and creates Jira tickets from them. Here’s how it works step by step:

  1. Email Monitoring: The system checks for and downloads unread emails with attachments and their history every 30 seconds. The system changes the status of the email to “read” so it won’t be processed again in future runs
  2. AI Analysis: An artificial intelligence analyzes the emails to understand their content.
  3. Project Identification: The system first queries the available Jira projects so the AI can identify which project the email belongs to.
  4. Information Extraction: The AI extracts the necessary data from the email to create a Jira ticket.
  5. Automatic Feedback: If the information is unclear or incomplete, the AI automatically writes a reply to the sender requesting more details. The email then remains unprocessed. When the sender responds, the reply email arrives as unread, so the system processes it again later.
  6. Ticket Creation: If all necessary information is available, the system automatically creates the appropriate Jira ticket through the JIRA REST API and attaches the email attachments as well.

Implemantation:

You’ll need the following packages:

pip install python-dotenv langchain langchain-anthropic langgraph google-api-python-client google-auth-oauthlib requests fastapi uvicorn apscheduler

The below code implements a Gmail integration using the Google API. It authenticates with Google’s services through the get_authentication() function, which manages OAuth credentials, storing and refreshing tokens as needed. The core functionality revolves around get_unread_messages_with_threads(), which retrieves unread customer emails, automatically marks them as read, and collects their complete conversation history. Messages are processed by extract_mail(), which pulls out critical metadata like subject, sender, and timestamp. The get_message_body_and_attachments() function handles parsing both plain text and HTML content while identifying attachments. When attachments are present, download_attachment() can save them locally or return the binary data. Finally, the reply_to_thread() function enables automated responses to email threads, properly formatting replies with appropriate headers and references to maintain conversation context. Together, these functions create a comprehensive system for monitoring, processing, and responding to customer communications via Gmail.

import os
import os.path
import base64
import re
from email.mime.text import MIMEText

from googleapiclient.discovery import build
from google_auth_oauthlib.flow import InstalledAppFlow
from google.auth.transport.requests import Request
from google.oauth2.credentials import Credentials
from googleapiclient.errors import HttpError

# If modifying these scopes, delete the token.json file
SCOPES = ['https://www.googleapis.com/auth/gmail.readonly', 'https://www.googleapis.com/auth/gmail.modify']


def get_authentication():
    """
    Handles the authentication process with Google API
    Returns an authenticated service object
    """
    creds = None
    # The file credentials.json stores the user's access and refresh tokens
    if os.path.exists('token.json'):
        creds = Credentials.from_authorized_user_file('token.json', SCOPES)

    # If there are no valid credentials, let the user log in
    if not creds or not creds.valid:
        if creds and creds.expired and creds.refresh_token:
            creds.refresh(Request())
        else:
            flow = InstalledAppFlow.from_client_secrets_file(
                'credentials.json', SCOPES)
            creds = flow.run_local_server(port=0)

        # Save the credentials for the next run
        with open('token.json', 'w') as token:
            token.write(creds.to_json())

    return build('gmail', 'v1', credentials=creds)


def get_unread_messages_with_threads(user_id='me', label_ids=None, max_results=20):
    """
    Retrieves unread messages with the specified label, including their complete history and the text of the original messages.

    Args:
        user_id: The user's email address or 'me' for the authenticated user.
        label_ids: List of desired label identifiers.
        max_results: Maximum number of messages to retrieve.

    Returns:
        List of unread messages with their complete history, the text of the original messages, and their attachments.
    """
    try:
        service = get_authentication()
        response = service.users().messages().list(
            userId=user_id,
            labelIds=label_ids,
            q='is:unread',
            maxResults=max_results
        ).execute()

        messages = []

        if 'messages' not in response:
            print("There are no unread messages with the specified label.")
            return messages

        for msg in response['messages']:

            email_conversation = []
            service.users().messages().modify(
                userId=user_id,
                id=msg["id"],
                body={'removeLabelIds': ['UNREAD'],'addLabelIds': []}
            ).execute()

            if msg.get('threadId'):
                thread = service.users().threads().get(userId=user_id, id=msg['threadId']).execute()
                for message in thread['messages']:
                    extract_mail(email_conversation, message)
            else:
                extract_mail(email_conversation, msg)

            messages.append({
                'email_conversation': email_conversation,
                'subject': email_conversation[0]['subject'] if email_conversation else 'N/A',
                'email_id': msg.get('threadId'),
                'has_attachments': any(len(msg['attachments']) > 0 for msg in email_conversation)
            })

        return messages

    except HttpError as error:
        print(f"Error: {error}")
        return []


def extract_mail(email_conversation, message):
    msg_id = message['id']
    payload = message['payload']
    headers = payload.get('headers', [])
    subject = next((header['value'] for header in headers if header['name'] == 'Subject'), 'N/A')
    from_email = next((header['value'] for header in headers if header['name'] == 'From'), 'N/A')
    date = next((header['value'] for header in headers if header['name'] == 'Date'), 'N/A')
    email_id = msg_id

    body, attachments = get_message_body_and_attachments(payload, msg_id)
    email_conversation.append({
        'email_id': email_id,
        'subject': subject,
        'from': from_email,
        'date': date,
        'body': body,
        'attachments': attachments
    })



def get_message_body_and_attachments(payload, msg_id):
    """
    Extracts the message body and attachments from the payload.

    Args:

    Returns:
       Tuple containing:
       - The message body as text.
       - List of attachments (filename, size, mime-type, attachment_id).
    """
    body = ''
    attachments = []

    def process_parts(parts):
        nonlocal body
        nonlocal attachments

        for part in parts:
            mime_type = part.get('mimeType', '')

            # If this is a text partRetryClaude can make mistakes. Please double-check responses.
            if mime_type == 'text/plain' and not body:  # Csak akkor állítjuk be a body-t, ha még nincs
                data = part.get('body', {}).get('data')
                if data:
                    body = base64.urlsafe_b64decode(data.encode('ASCII')).decode('utf-8')

            # If this is an HTML part and we don't have a text body yetRetryClaude can make mistakes. Please double-check responses.
            elif mime_type == 'text/html' and not body:
                data = part.get('body', {}).get('data')
                if data:
                    # Extract text from HTML
                    html_body = base64.urlsafe_b64decode(data.encode('ASCII')).decode('utf-8')
                    body = html_body

            elif 'filename' in part.get('body', {}) or 'attachmentId' in part.get('body', {}):
                filename = part.get('filename', 'unnamed_attachment')
                body_data = part.get('body', {})
                attachment_id = body_data.get('attachmentId')


                attachments.append(download_attachment(
                    service=get_authentication(),
                    attachment_id=attachment_id,
                    message_id=msg_id,
                    user_id="me",
                    save_path=f"./files/{filename}"
                ))

            if 'parts' in part:
                process_parts(part['parts'])

    if 'parts' in payload:
        process_parts(payload['parts'])
    else:
        mime_type = payload.get('mimeType', '')
        if mime_type == 'text/plain' or mime_type == 'text/html':
            data = payload.get('body', {}).get('data')
            if data:
                body = base64.urlsafe_b64decode(data.encode('ASCII')).decode('utf-8')

    return body, attachments


def download_attachment(service, user_id, message_id, attachment_id, save_path=None):
    """
    Downloads the attachment from the specified message.

    Args:
       service: The Gmail API service.
       user_id: The user ID.
       message_id: The message ID.
       attachment_id: The attachment ID.
       save_path: The save location. If None, just returns the bytes.

    Returns:
       The attachment in bytes form, or the file path if save_path is specified.
    """
    try:
        attachment = service.users().messages().attachments().get(
            userId=user_id,
            messageId=message_id,
            id=attachment_id
        ).execute()

        data = attachment.get('data')
        if not data:
            return None

        file_data = base64.urlsafe_b64decode(data.encode('UTF-8'))

        if save_path:
            with open(save_path, 'wb') as f:
                f.write(file_data)
            return save_path
        else:
            return file_data

    except Exception as error:
        print(f"Error: {error}")
        return None

def reply_to_thread(thread_id, message_text):
    service = get_authentication()
    thread = service.users().threads().get(userId="me", id=thread_id).execute()
    messages = thread.get('messages', [])
    last_message = messages[-1]

    headers = {h['name']: h['value'] for h in last_message['payload']['headers']}

    # Extract a clean email address
    email_address = re.search(r'[\w.-]+@[\w.-]+', headers['From']).group()

    # Prepare the reply message
    reply_msg = MIMEText(message_text)
    reply_msg['To'] = email_address
    reply_msg['Subject'] = "Re: " + headers['Subject']
    reply_msg['In-Reply-To'] = headers['Message-ID']
    reply_msg['References'] = headers['Message-ID']

    # Encode and send message
    raw_message = base64.urlsafe_b64encode(reply_msg.as_bytes()).decode()
    message = {'raw': raw_message, 'threadId': thread_id}

    sent_message = service.users().messages().send(userId="me", body=message).execute()
    print(f"Message sent! ID: {sent_message['id']}")

As the next step, let’s write the Jira integration. You will need a Jira token from here, which you can create at: https://id.atlassian.com/manage-profile/security/api-tokens. The get_session() function creates an authenticated session using environment variables for the Jira username and API key. The get_projects() function retrieves a list of available Jira projects, returning their IDs and names.

The attach_to_issue() function uploads a file attachment to an existing Jira issue by specifying the issue key and the local file path. It handles opening the file in binary mode and properly configuring the request headers. create_jira_issue(), creates a new Jira issue with a specified title, project ID, and description. The function also supports attaching multiple files to the newly created issue by calling the attach_to_issue() function for each attachment path provided.

def get_session():
    session = requests.Session()
    session.auth = (os.getenv("JIRA_USER"), os.getenv("JIRA_API_KEY"))
    return session


def get_projects():
    url = f"https://{os.getenv('JIRA_DOMAIN')}.atlassian.net/rest/api/3/project/search"
    response = get_session().get(url)

    projects = []
    if response.status_code == 200:
        response_json = response.json()
        for project in response_json["values"]:
            projects.append({"id": project["id"], "name": project["name"]})
    else:
        print(response.status_code)
        print(response.text)

    return projects


def attach_to_issue(issue_key: str, attachment_path: str) -> Dict:
    """Attach a file to an existing Jira issue

    Args:
        issue_key: The key of the issue (e.g., 'PROJ-123')
        attachment_path: Path to the file to be attached

    Returns:
        Response data from the attachment request
    """
    url = f"https://{os.getenv('JIRA_DOMAIN')}.atlassian.net/rest/api/3/issue/{issue_key}/attachments"

    headers = {
        "X-Atlassian-Token": "no-check"
    }

    with open(attachment_path, 'rb') as file:
        files = {'file': (os.path.basename(attachment_path), file)}
        session = get_session()
        response = session.post(url, headers=headers, files=files)

    if response.status_code == 200:
        return response.json()
    else:
        raise Exception(f"Failed to attach file: {response.status_code} Error: {response.text}")


def create_jira_issue(issue_name: str, project_id: str, description: str,
                      attachments: Optional[List[str]] = None) -> Dict:
    """
    Create a Jira issue with the provided parameters

    Args:
        issue_name: Title of the issue
        project_id: ID of the project in Jira
        description: Detailed description of the issue
        attachments: List of file paths to attach to the issue

    Returns:
        Dictionary with the created issue's key and ID
    """
    url = f"https://{os.getenv('JIRA_DOMAIN')}.atlassian.net/rest/api/3/issue"

    if attachments is None:
        attachments = []

    issue_data = {
        "fields": {
            "summary": issue_name,
            "project": {"id": project_id},
            "description": {
                "type": "doc",
                "version": 1,
                "content": [
                    {
                        "type": "paragraph",
                        "content": [
                            {
                                "type": "text",
                                "text": description
                            }
                        ]
                    }
                ]
            },
            "issuetype": {"id": "10002"},
        },
        "update": {}
    }

    session = get_session()
    response = session.post(url, json=issue_data)

    if response.status_code in (200, 201):
        issue = response.json()
        print(f"Issue created successfully: {issue['key']}")

        if attachments:
            for attachment_path in attachments:
                attach_to_issue(issue["key"], attachment_path)

        return issue
    else:
        raise Exception(f"Failed to create issue: {response.status_code} Error: {response.text}")

Now that the necessary integrations are in place, it’s time to build the AI Agentic workflow. It is structured in the following way. The issue_data_agent analyzes the incoming customer email and also receives the Jira project list, then tries to identify the essential information – such as the project identifier, the problem name and description, and decides whether we have enough information to create a ticket or if additional data is needed.

When all necessary information is available, the create_issue_ticket() function automatically creates a Jira ticket, attaching the information and attachments found in the email. However, if some important data is missing, the create_email_response_agent writes an email to the customer, explaining exactly what additional information the support team needs.

The router() function decides which direction the process should continue – either it takes the „create_ticket” route and creates a bug ticket, or it chooses the „request_more_info” route and requests additional information.

import os
from typing import List, TypedDict, Literal, Dict, Any

from langchain_anthropic import ChatAnthropic
from langchain_core.output_parsers import JsonOutputParser, StrOutputParser
from langchain_core.prompts import PromptTemplate
from pydantic import SecretStr

from langgraph.graph import StateGraph, END

from jira import get_projects, create_jira_issue
from mail import reply_to_thread


class State(TypedDict):
    email_id: str
    email_conversation: List[Dict[str, Any]]
    current_mail: str
    need_more_information: bool
    project_id: str
    issue_name: str
    description: str
    additional_info_needed: str
    response_mail: str
    mail_subject: str


def get_model():
    return ChatAnthropic(
        anthropic_api_key=SecretStr(os.getenv('ANTHROPIC_API_KEY')),
        model="claude-3-7-sonnet-20250219"
    )


def create_issue_data_agent():
    prompt_str = """
    You are an assistant that extracts issue information from customer emails to create issue tickets.

    # Instructions
    1. Analyze the customer email to extract all necessary information.
    2. Extract the project ID by matching information in the email with the provided project list. Be flexible with partial matches or descriptions.
    3. Extract other required fields: issue name, issue description
    4. Only set need_more_information to true if critical information cannot be determined after attempting reasonable inference.
    5. When need_more_information is true, provide a specific description in additional_info_needed about exactly what information is missing.

    # Available Projects
    <projects>
        {projects}
    </projects>

    # Mail subject
    <mail_subject>
        {subject}
    </mail_subject>

    # Emails
    <history>
        {email_history}
    </history>

    # Response Format
    Return a JSON object that follows the IssueData structure:
    {{
        "need_more_information": boolean,  // only true if critical information cannot be determined
        "project_id": string,              // matching project ID
        "name": string,                    // concise issue name/title
        "description": string,             // detailed issue description
        "additional_info_needed": string   // specific description of missing information, empty if not needed
    }}
    """
    prompt = PromptTemplate(template=prompt_str, input_variables=["projects", "email_history", "subject"])
    return prompt | get_model() | JsonOutputParser()


def create_email_response_agent():
    prompt_str = """
    You are a helpful customer support agent responding to a customer's issue report. 
    Your task is to craft a professional and empathetic email requesting the additional information needed to properly address their issue.

    # Conversation
    <history>
        {email_conversation}
    </history>

    # The information that is needed
    <additional_info_needed>
        {additional_info_needed}
    </additional_info_needed>

    # Instructions
    1. Start with a polite greeting and thank the customer for their message.
    2. Acknowledge their issue briefly to show understanding.
    3. Explain that you need additional information to proceed with resolving their issue.
    4. Clearly list the specific information needed as mentioned in the additional_info_needed section.
    5. Provide a clear explanation for why this information is necessary.
    6. End with a friendly closing, including your name and support team.
    7. Keep the tone professional, helpful, and empathetic.
    8. Use simple, clear language that is easy to understand.

    # Response Format
    Write a complete email response that follows the above instructions. 
    Do not use placeholders or template variables in your response.
    """
    prompt = PromptTemplate(template=prompt_str,
                            input_variables=["email_conversation", "additional_info_needed"])
    return prompt | get_model() | StrOutputParser()




def extract_issue_data(state: State) -> State:
    """Extract issue data from the customer email"""
    projects = get_projects()

    issue_data_agent = create_issue_data_agent()
    result = issue_data_agent.invoke({
        "projects": "\n".join([f"{p['id']}: {p['name']}" for p in projects]),
        "email_history": [conv["body"] for conv in state["email_conversation"]],
        "subject": state["mail_subject"]
    })

    state["need_more_information"] = result["need_more_information"]
    state["project_id"] = result["project_id"]
    state["issue_name"] = result["name"]
    state["description"] = result["description"]
    state["additional_info_needed"] = result["additional_info_needed"]

    return state


def create_issue_ticket(state: State) -> State:
    """Create an issue ticket in the system"""
    attachments = []
    for conv in state["email_conversation"]:
        attachments.extend(conv["attachments"])
    create_jira_issue(
        issue_name=state['issue_name'],
        description=state['description'],
        project_id=state['project_id'],
        attachments=attachments,
    )
    return state


def generate_more_info_email(state: State) -> State:
    """Generate an email requesting more information"""
    email_response_agent = create_email_response_agent()
    response = email_response_agent.invoke({
        "email_conversation":  [conv["body"] for conv in state["email_conversation"]],
        "additional_info_needed": state["additional_info_needed"]
    })
    reply_to_thread(thread_id=state["email_id"], message_text=response)
    return state


def router(state: State) -> Literal["create_ticket", "request_more_info"]:
    """Route based on whether more information is needed"""
    if state["need_more_information"]:
        return "request_more_info"
    else:
        return "create_ticket"


def build_issue_workflow_graph():
    workflow = StateGraph(State)


    workflow.add_node("extract_issue_data", extract_issue_data)
    workflow.add_node("create_ticket", create_issue_ticket)
    workflow.add_node("request_more_info", generate_more_info_email)

    workflow.set_entry_point("extract_issue_data")
    workflow.add_conditional_edges(
        "extract_issue_data",
        router,
        {
            "create_ticket": "create_ticket",
            "request_more_info": "request_more_info"
        }
    )

    workflow.add_edge("create_ticket", END)
    workflow.add_edge("request_more_info", END)

    return workflow.compile()

And let’s put it all together. Let’s create a Fast API server that runs the previously defined workflow every 30 seconds, thus starting the automatic email processing

from contextlib import asynccontextmanager
from fastapi import FastAPI, BackgroundTasks
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.triggers.interval import IntervalTrigger
import uvicorn
from dotenv import load_dotenv

from ai import build_issue_workflow_graph
from mail import get_unread_messages_with_threads

scheduler = AsyncIOScheduler()


@asynccontextmanager
async def lifespan(app: FastAPI):
    scheduler.start()
    yield
    scheduler.shutdown()


app = FastAPI(lifespan=lifespan)


@scheduler.scheduled_job(trigger=IntervalTrigger(seconds=30))
async def process_emails():
    print("Email processing started")
    processed_emails = get_unread_messages_with_threads()
    graph = build_issue_workflow_graph()
    for email in processed_emails:
        initial_state = {
            "email_id": email["email_id"],
            "email_conversation": email["email_conversation"],
            "need_more_information": False,
            "additional_info_needed": "",
            "project_id": 0,
            "name": "",
            "description": "",
            "mail_subject": email['subject'],
        }
        final_state = graph.invoke(initial_state)
        print(final_state)
    print("Email processing end!")


@app.post("/start-email-processing")
async def start_email_processing(background_tasks: BackgroundTasks):
    background_tasks.add_task(process_emails)
    return {"message": "Manual started"}




if __name__ == "__main__":
    load_dotenv()
    uvicorn.run(app, host="0.0.0.0", port=8000

So let’s test the solution with the following email I sent with an attachment.

And the following ticket was created in Jira.

In this article, we built an efficient AI agent that can analyze customer emails and automatically create Jira tasks.


Vélemény, hozzászólás?

Az e-mail címet nem tesszük közzé. A kötelező mezőket * karakterrel jelöltük