diff --git a/ChangeLog.md b/ChangeLog.md index eb90523..f9161a8 100644 --- a/ChangeLog.md +++ b/ChangeLog.md @@ -1,3 +1,10 @@ +# v0.0.2.1 + +## New Features +- Workflow: add base functionality for workflows +- People: add base functionality for people +- core: automatically get api user person object (optional, default: True) + # v0.0.2 ## New Features diff --git a/README.md b/README.md index a01c43c..77b9958 100644 --- a/README.md +++ b/README.md @@ -2,6 +2,9 @@ This module is used to interact with the Xurrent API. It provides a set of classes to interact with the API. +## Change Log + +[ChangeLog.md](https://github.com/fasteiner/xurrent-python/blob/main/ChangeLog.md) ## Usage @@ -19,6 +22,10 @@ This module is used to interact with the Xurrent API. It provides a set of class x_api_helper = XurrentApiHelper(baseUrl, apitoken, account) + # Plain API Call + uri = "/requests?subject=Example Subject" + connection_object.api_call(uri, 'GET') + ``` ### Requests diff --git a/pyproject.toml b/pyproject.toml index f35dba4..b0e31b0 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "xurrent" -version = "0.0.2" +version = "0.0.2.1" authors = [ { name="Fabian Steiner", email="fabian@stei-ner.net" }, ] diff --git a/src/xurrent/core.py b/src/xurrent/core.py index 076cb09..20dcc12 100644 --- a/src/xurrent/core.py +++ b/src/xurrent/core.py @@ -1,3 +1,4 @@ +from __future__ import annotations # Needed for forward references import logging import requests import json @@ -26,12 +27,17 @@ def to_json(self): class XurrentApiHelper: + api_user: Person # Forward declaration with a string - def __init__(self, base_url, api_key, api_account): + def __init__(self, base_url, api_key, api_account, resolve_user=True): self.base_url = base_url self.api_key = api_key self.api_account = api_account self.logger = logging.getLogger(__name__) + if resolve_user: + # Import Person lazily + from .people import Person + self.api_user = Person.get_me(self) def __append_per_page(self, uri, per_page=100): """ @@ -39,7 +45,7 @@ def __append_per_page(self, uri, per_page=100): :param uri: URI to append the parameter to :param per_page: Number of records per page :return: URI with the 'per_page' parameter appended - >>> helper = XurrentApiHelper('https://api.example.com', 'api_key', 'account') + >>> helper = XurrentApiHelper('https://api.example.com', 'api_key', 'account', False) >>> helper._XurrentApiHelper__append_per_page('https://api.example.com/tasks') 'https://api.example.com/tasks?per_page=100' >>> helper._XurrentApiHelper__append_per_page('https://api.example.com/tasks?status=open') @@ -138,7 +144,7 @@ def custom_fields_to_object(self, custom_fields): :param custom_fields: List of custom fields :return: Dictionary containing the custom fields - >>> helper = XurrentApiHelper('https://api.example.com', 'api_key', 'account') + >>> helper = XurrentApiHelper('https://api.example.com', 'api_key', 'account', False) >>> helper.custom_fields_to_object([{'id': 'priority', 'value': 'high'}, {'id': 'status', 'value': 'open'}]) {'priority': 'high', 'status': 'open'} """ @@ -153,7 +159,7 @@ def object_to_custom_fields(self, obj): :param obj: Dictionary to convert :return: List of custom fields - >>> helper = XurrentApiHelper('https://api.example.com', 'api_key', 'account') + >>> helper = XurrentApiHelper('https://api.example.com', 'api_key', 'account', False) >>> helper.object_to_custom_fields({'priority': 'high', 'status': 'open'}) [{'id': 'priority', 'value': 'high'}, {'id': 'status', 'value': 'open'}] """ @@ -167,7 +173,7 @@ def create_filter_string(self, filter: dict): Create a filter string from a dictionary. :param filter: Dictionary containing the filter parameters :return: String containing the filter parameters - >>> helper = XurrentApiHelper('https://api.example.com', 'api_key', 'account') + >>> helper = XurrentApiHelper('https://api.example.com', 'api_key', 'account', False) >>> helper.create_filter_string({'status': 'open', 'priority': 'high'}) 'status=open&priority=high' >>> helper.create_filter_string({'status': 'open'}) diff --git a/src/xurrent/people.py b/src/xurrent/people.py new file mode 100644 index 0000000..73970bf --- /dev/null +++ b/src/xurrent/people.py @@ -0,0 +1,74 @@ +from .core import XurrentApiHelper +from typing import Optional, List, Dict, Type, TypeVar + + +T = TypeVar('T', bound='Person') + +class Person(): + #https://developer.4me.com/v1/people/ + resourceUrl = 'people' + + def __init__(self, connection_object: XurrentApiHelper, id, name: str = None, primary_email: str = None,**kwargs): + self._connection_object = connection_object + self.id = id + self.name = name + self.primary_email = primary_email + for key, value in kwargs.items(): + setattr(self, key, value) + + def __update_object__(self, data) -> None: + if data.get('id') != self.id: + raise ValueError(f"ID mismatch: {self.id} != {data.get('id')}") + for key, value in data.items(): + setattr(self, key, value) + + def __str__(self) -> str: + """ + Return a string representation of the object. + """ + return f"Person(id={self.id}, name={self.name}, primary_email={self.primary_email})" + + @classmethod + def from_data(cls, connection_object: XurrentApiHelper, data) -> T: + if not isinstance(data, dict): + raise TypeError(f"Expected 'data' to be a dictionary, got {type(data).__name__}") + if 'id' not in data: + raise ValueError("Data dictionary must contain an 'id' field.") + return cls(connection_object, **data) + + @classmethod + def get_by_id(cls, connection_object: XurrentApiHelper, id): + uri = f'{connection_object.base_url}/{cls.resourceUrl}/{id}' + return cls.from_data(connection_object, connection_object.api_call(uri, 'GET')) + + @classmethod + def get_me(cls, connection_object: XurrentApiHelper): + """ + Retrieve the person object for the authenticated user. + """ + uri = f'{connection_object.base_url}/{cls.resourceUrl}/me' + return cls.from_data(connection_object, connection_object.api_call(uri, 'GET')) + + def update(self, data): + uri = f'{self._connection_object.base_url}/{self.resourceUrl}/{self.id}' + response = self._connection_object.api_call(uri, 'PATCH', data) + self.__update_object__(response) + return self + + def disable(self, prefix: str = '', postfix: str = ''): + """ + Disable the person and update the name. + + :param prefix: Prefix to add to the name + :param postfix: Postfix to add to the name + :return: Updated person object + """ + uri = f'{self._connection_object.base_url}/{self.resourceUrl}/{self.id}' + response = self._connection_object.api_call(uri, 'PATCH', { + 'disabled': 'true', + "name": f"{prefix}{self.name}{postfix}" + }) + self.__update_object__(response) + return self + + \ No newline at end of file diff --git a/src/xurrent/requests.py b/src/xurrent/requests.py index cb7b5af..26c7c9a 100644 --- a/src/xurrent/requests.py +++ b/src/xurrent/requests.py @@ -92,8 +92,8 @@ def __str__(self) -> str: def from_data(cls, connection_object: XurrentApiHelper, data) -> T: if not isinstance(data, dict): raise TypeError(f"Expected 'data' to be a dictionary, got {type(data).__name__}") - if not 'id' in data: - data['id'] = id + if 'id' not in data: + raise ValueError("Data dictionary must contain an 'id' field.") return cls(connection_object, **data) diff --git a/src/xurrent/tasks.py b/src/xurrent/tasks.py index 92a4fe0..0a3ed3a 100644 --- a/src/xurrent/tasks.py +++ b/src/xurrent/tasks.py @@ -22,7 +22,3 @@ def update(connection_object: XurrentApiHelper, id, data): uri = f'{connection_object.base_url}/{Task.resourceUrl}/{id}' return connection_object.api_call(uri, 'PATCH', data) - @staticmethod - def delete(connection_object: XurrentApiHelper, id): - uri = f'{connection_object.base_url}/{Task.resourceUrl}/{id}' - return connection_object.api_call(uri, 'DELETE') diff --git a/src/xurrent/workflows.py b/src/xurrent/workflows.py index d9b4237..c349eeb 100644 --- a/src/xurrent/workflows.py +++ b/src/xurrent/workflows.py @@ -1,36 +1,182 @@ +from datetime import datetime +from typing import Optional, List, Dict from .core import XurrentApiHelper +from enum import Enum +class WorkflowCompletionReason(str, Enum): + withdrawn = "withdrawn" # Withdrawn - Withdrawn by Requester + rejected = "rejected" # Rejected - Rejected by Approver + rolled_back = "rolled_back" # Rolled Back - Original Environment Restored + failed = "failed" # Failed - No Requirements Met + partial = "partial" # Partial - Not All Requirements Met + disruptive = "disruptive" # Disruptive - Caused Service Disruption + complete = "complete" # Complete - All Requirements Met -class Workflow(): - #https://developer.4me.com/v1/workflows/ +class WorkflowStatus(str, Enum): + being_created = "being_created" # Being Created + registered = "registered" # Registered + in_progress = "in_progress" # In Progress + progress_halted = "progress_halted" # Progress Halted + completed = "completed" # Completed + # risk_and_impact = "risk_and_impact" # Risk & Impact — deprecated: replaced by in_progress + # approval = "approval" # Approval — deprecated: replaced by in_progress + # implementation = "implementation" # Implementation — deprecated: replaced by in_progress + + +class WorkflowPredefinedFilter(str, Enum): + """ + Predefined filters for tasks. + """ + open = 'open' # /workflows/completed: List all completed workflows + completed = 'completed' #/workflows/open: List all open workflows + managed_by_me = 'managed_by_me' #/workflows/managed_by_me: List all workflows which manager is the API user + + +class Workflow: + # Endpoint for workflows resourceUrl = 'workflows' - @staticmethod - def get_by_id(connection_object: XurrentApiHelper, id): - uri = f'{connection_object.base_url}/{Workflow.resourceUrl}/{id}' + def __init__(self, + connection_object: XurrentApiHelper, + id: int, + subject: Optional[str] = None, + status: Optional[str] = None, + created_at: Optional[datetime] = None, + updated_at: Optional[datetime] = None, + **kwargs): + self.id = id + self._connection_object = connection_object + self.subject = subject + self.status = status + self.created_at = created_at + self.updated_at = updated_at + for key, value in kwargs.items(): + setattr(self, key, value) + + def __str__(self) -> str: + """Provide a human-readable string representation of the object.""" + return (f"Workflow(id={self.id}, subject={self.subject}, status={self.status}, " + f"category={self.category}, impact={self.impact})") + + @classmethod + def from_data(cls, connection_object: XurrentApiHelper, data: dict): + if not isinstance(data, dict): + raise TypeError(f"Expected 'data' to be a dictionary, got {type(data).__name__}") + if 'id' not in data: + raise ValueError("Data dictionary must contain an 'id' field.") + return cls(connection_object, **data) + + @classmethod + def get_by_id(cls, connection_object: XurrentApiHelper, id: int) -> dict: + """ + Retrieve a workflow by its ID. + """ + uri = f'{connection_object.base_url}/{cls.resourceUrl}/{id}' + return cls.from_data(connection_object, connection_object.api_call(uri, 'GET')) + + @classmethod + def get_workflow_tasks_by_workflow_id(cls, connection_object: XurrentApiHelper, id: int) -> List[dict]: + """ + Retrieve all tasks associated with a workflow by its ID. + """ + uri = f'{connection_object.base_url}/{cls.resourceUrl}/{id}/tasks' return connection_object.api_call(uri, 'GET') - @staticmethod - def get_workflow_task_by_template_id(connection_object: XurrentApiHelper, workflowID: int, templateID: int): - uri = f'{connection_object.base_url}/{Workflow.resourceUrl}/{workflowID}/tasks?template={templateID}' + @classmethod + def get_workflow_task_by_template_id(cls, connection_object: XurrentApiHelper, workflowID: int, templateID: int) -> dict: + """ + Retrieve a specific task associated with a workflow by template ID. + """ + uri = f'{connection_object.base_url}/{cls.resourceUrl}/{workflowID}/tasks?template={templateID}' tasks = connection_object.api_call(uri, 'GET') - if len(tasks) == 0: + if not tasks: return None if len(tasks) > 1: raise Exception(f"Multiple tasks found for templateID: {templateID}") return tasks[0] - @staticmethod - def get_workflow_tasks_by_workflow_id(connection_object: XurrentApiHelper, id: int): - uri = f'{connection_object.base_url}/{Workflow.resourceUrl}/{id}/tasks' - return connection_object.api_call(uri, 'GET') + def update(self, data: dict): + """ + Update the current workflow instance with new data. + """ + if not self.id: + raise ValueError("Workflow instance must have an ID to update.") + uri = f'{self._connection_object.base_url}/{self.resourceUrl}/{self.id}' + response = self._connection_object.api_call(uri, 'PATCH', data) + self.__update_object__(response) + return self @staticmethod - def update(connection_object: XurrentApiHelper, id, data): - uri = f'{connection_object.base_url}/{Workflow.resourceUrl}/{id}' - return connection_object.api_call(uri, 'PATCH', data) + def update_by_id(connection_object: XurrentApiHelper, id: int, data: dict) -> dict: + """ + Update a workflow by its ID. + """ + workflow = Workflow(connection_object, id) + return workflow.update(data) - @staticmethod - def delete(connection_object: XurrentApiHelper, id): - uri = f'{connection_object.base_url}/{Workflow.resourceUrl}/{id}' - return connection_object.api_call(uri, 'DELETE') + @classmethod + def create(cls, connection_object: XurrentApiHelper, data: dict): + """ + Create a new workflow. + """ + uri = f'{connection_object.base_url}/{cls.resourceUrl}' + response = connection_object.api_call(uri, 'POST', data) + return cls.from_data(connection_object, response) + + def close(self, note="closed.", completion_reason=WorkflowCompletionReason.complete): + """ + Close the current workflow instance. + """ + if not self.id: + raise ValueError("Workflow instance must have an ID to close.") + uri = f'{self._connection_object.base_url}/{self.resourceUrl}/{self.id}' + response = self._connection_object.api_call(uri, 'PATCH', { + 'note': note, + 'manager_id': self._connection_object.api_user.id, + 'status': WorkflowStatus.completed, + 'completion_reason': completion_reason + }) + self.__update_object__(response) + return self + + def archive(self): + """ + Archive the current workflow instance. + """ + if not self.id: + raise ValueError("Workflow instance must have an ID to archive.") + uri = f'{self._connection_object.base_url}/{self.resourceUrl}/{self.id}/archive' + response = self._connection_object.api_call(uri, 'POST') + self.__update_object__(response) + return self + + def trash(self): + """ + Trash the current workflow instance. + """ + if not self.id: + raise ValueError("Workflow instance must have an ID to trash.") + uri = f'{self._connection_object.base_url}/{self.resourceUrl}/{self.id}/trash' + response = self._connection_object.api_call(uri, 'POST') + self.__update_object__(response) + return self + + def restore(self): + """ + Restore the current workflow instance. + """ + if not self.id: + raise ValueError("Workflow instance must have an ID to restore.") + uri = f'{self._connection_object.base_url}/{self.resourceUrl}/{self.id}/restore' + response = self._connection_object.api_call(uri, 'POST') + self.__update_object__(response) + return self + + def __update_object__(self, data): + """ + Update the instance properties with new data. + """ + if data.get('id') != self.id: + raise ValueError(f"ID mismatch: {self.id} != {data.get('id')}") + for key, value in data.items(): + setattr(self, key, value)