From 1b47dd147d738a27b830e0a88caec65aef4899d4 Mon Sep 17 00:00:00 2001 From: "Ing. Fabian Franz Steiner BSc." Date: Fri, 6 Dec 2024 10:56:48 +0100 Subject: [PATCH 1/3] Add some documentation for workflow --- README.md | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/README.md b/README.md index 77b9958..a2ea1d6 100644 --- a/README.md +++ b/README.md @@ -75,6 +75,23 @@ This module is used to interact with the Xurrent API. It provides a set of class ``` +### Workflows + +```python + + from xurrent.workflows import Workflow + + workflow = Workflow.get_by_id(x_api_helper, ) + + #close + workflow.close() # completion reason: completed, note: closed + # close with completion reason + workflow.close(completion_reason="withdrawn") + #close with completion reason and note + workflow.close(completion_reason="withdrawn", note="This is a test note") + +``` + ### Tasks ```python From 1a98f7e36a05b627a127491aaa92ed132449e4f7 Mon Sep 17 00:00:00 2001 From: "Ing. Fabian Franz Steiner BSc." Date: Fri, 6 Dec 2024 11:00:13 +0100 Subject: [PATCH 2/3] use update function for disable --- src/xurrent/people.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/src/xurrent/people.py b/src/xurrent/people.py index 73970bf..6c36c2e 100644 --- a/src/xurrent/people.py +++ b/src/xurrent/people.py @@ -63,12 +63,9 @@ def disable(self, prefix: str = '', postfix: str = ''): :param postfix: Postfix to add to the name :return: Updated person object """ - uri = f'{self._connection_object.base_url}/{self.resourceUrl}/{self.id}' - response = self._connection_object.api_call(uri, 'PATCH', { + return self.update({ 'disabled': 'true', "name": f"{prefix}{self.name}{postfix}" }) - self.__update_object__(response) - return self \ No newline at end of file From 5850ea8afe262d7139cf14f3d27e13f62dd2e555 Mon Sep 17 00:00:00 2001 From: "Ing. Fabian Franz Steiner BSc." Date: Fri, 6 Dec 2024 14:59:35 +0100 Subject: [PATCH 3/3] Update changelog for version 0.0.2.2, add new features and bug fixes; rename get_request to get_requests and add update_by_id method --- ChangeLog.md | 39 +++++++++++++++++---- pyproject.toml | 2 +- src/xurrent/requests.py | 14 +++++++- src/xurrent/tasks.py | 75 +++++++++++++++++++++++++++++++++++----- src/xurrent/workflows.py | 58 ++++++++++++++++++++++--------- 5 files changed, 154 insertions(+), 34 deletions(-) diff --git a/ChangeLog.md b/ChangeLog.md index f9161a8..623edec 100644 --- a/ChangeLog.md +++ b/ChangeLog.md @@ -1,22 +1,47 @@ -# v0.0.2.1 +# Change Log + +## v0.0.2.2 + +### New Features + +- Task: add base functionality for tasks +- Workflow: add methods: get_task_by_template_id, get_tasks +- Workflow: update: add check for possible status values +- Request: add static method: update_by_id + +### Bug Fixes + +- Workflow: Fix toString / __str__ method + +### Breaking Changes + +- Request: renamed get_request to get_requests +- Workflow: get_workflow_task_by_template_id now returns a Task object List + + +## v0.0.2.1 + +### New Features -## New Features - Workflow: add base functionality for workflows - People: add base functionality for people - core: automatically get api user person object (optional, default: True) -# v0.0.2 +## v0.0.2 + +### New Features -## New Features - Request: add methods: archive, trash, restore -## Bug Fixes +### Bug Fixes + - Request: Fix get_request, get_notes method -# v0.0.1 +## v0.0.1 + +### New Features -## New Features - Pagination: auto pagination get requests - Retry-after: auto retry after 429 status code - custom fields conversion (from and to object/dict) \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml index b0e31b0..86a5ab7 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "xurrent" -version = "0.0.2.1" +version = "0.0.2.2" authors = [ { name="Fabian Steiner", email="fabian@stei-ner.net" }, ] diff --git a/src/xurrent/requests.py b/src/xurrent/requests.py index 26c7c9a..85cbb3d 100644 --- a/src/xurrent/requests.py +++ b/src/xurrent/requests.py @@ -111,7 +111,7 @@ def get_by_id(cls, connection_object: XurrentApiHelper, id: int) -> T: return cls.from_data(connection_object, response) @classmethod - def get_request(cls, connection_object: XurrentApiHelper, predefinedFiler: PredefinedFilter = None,queryfilter: dict = None) -> List[T]: + def get_requests(cls, connection_object: XurrentApiHelper, predefinedFiler: PredefinedFilter = None,queryfilter: dict = None) -> List[T]: """ Retrieve a request by its ID. :param connection_object: Instance of XurrentApiHelper @@ -178,6 +178,18 @@ def update(self, data: dict): self.__update_object__(response) return self + @staticmethod + def update_by_id(connection_object: XurrentApiHelper, id: int, data: dict) -> T: + """ + Update a request by its ID. + :param connection_object: Instance of XurrentApiHelper + :param id: ID of the request to update + :param data: Dictionary containing updated data + :return: Response from the API call + """ + request = Request(connection_object, id) + return request.update(data) + def close(self, note: str, completion_reason: CompletionReason = CompletionReason.solved): """ Close the current request instance. diff --git a/src/xurrent/tasks.py b/src/xurrent/tasks.py index 0a3ed3a..7ba48a2 100644 --- a/src/xurrent/tasks.py +++ b/src/xurrent/tasks.py @@ -1,24 +1,83 @@ from .core import XurrentApiHelper from .workflows import Workflow +from enum import Enum +from typing import Optional, List, Dict, Type, TypeVar + + +T = TypeVar('T', bound='Task') + +class TaskPredefinedFilter(str, Enum): + finished = "finished" # List all finished tasks + open = "open" # List all open tasks + managed_by_me = "managed_by_me" # List all tasks that are part of a workflow which manager is the API user + assigned_to_my_team = "assigned_to_my_team" # List all tasks that are assigned to one of the teams that the API user is a member of + assigned_to_me = "assigned_to_me" # List all tasks that are assigned to the API user + approval_by_me = "approval_by_me" # List all approval tasks that are assigned to the API user and which status is different from ‘Registered’ + + + class Task(): #https://developer.4me.com/v1/tasks/ resourceUrl = 'tasks' - @staticmethod - def get_by_id(connection_object: XurrentApiHelper, id): + def __init__(self, connection_object: XurrentApiHelper, id, subject: str = None, workflow: dict = None,description: str = None, **kwargs): + self._connection_object = connection_object + self.id = id + self.subject = subject + self.workflow = workflow + for key, value in kwargs.items(): + setattr(self, key, value) + + def __update_object__(self, data) -> None: + if data.get('id') != self.id: + raise ValueError(f"ID mismatch: {self.id} != {data.get('id')}") + for key, value in data.items(): + setattr(self, key, value) + + def __str__(self) -> str: + """ + Return a string representation of the object. + """ + return f"Task(id={self.id}, subject={self.subject}, workflow={self.workflow})" + + @classmethod + def from_data(cls, connection_object: XurrentApiHelper, data) -> T: + if not isinstance(data, dict): + raise TypeError(f"Expected 'data' to be a dictionary, got {type(data).__name__}") + if 'id' not in data: + raise ValueError("Data dictionary must contain an 'id' field.") + return cls(connection_object, **data) + + @classmethod + def get_by_id(cls, connection_object: XurrentApiHelper, id) -> T: uri = f'{connection_object.base_url}/{Task.resourceUrl}/{id}' + return cls.from_data(connection_object, connection_object.api_call(uri, 'GET')) + + @classmethod + def get_tasks(cls, connection_object: XurrentApiHelper, predefinedFilter: TaskPredefinedFilter = None, queryfilter: dict = None) -> List[T]: + uri = f'{connection_object.base_url}/{cls.resourceUrl}' + if predefinedFilter: + uri = f'{uri}/{predefinedFilter}' + if queryfilter: + uri = f'{uri}?{queryfilter}' return connection_object.api_call(uri, 'GET') - def get_workflow_of_task(connection_object: XurrentApiHelper, id, expand: bool = False): + @staticmethod + def get_workflow_of_task(connection_object: XurrentApiHelper, id, expand: bool = False) -> Workflow: task = Task.get_by_id(connection_object, id) if expand: - return Workflow.get_by_id(connection_object, task['workflow'].id) - return task['workflow'] + return Workflow.get_by_id(connection_object, task.workflow.id) + return Workflow.from_data(connection_object, task.workflow) @staticmethod - def update(connection_object: XurrentApiHelper, id, data): - uri = f'{connection_object.base_url}/{Task.resourceUrl}/{id}' - return connection_object.api_call(uri, 'PATCH', data) + def update_by_id(connection_object: XurrentApiHelper, id, data) -> T: + task = Task(connection_object=connection_object, id=id) + return task.update(data) + def update(self, data) -> T: + uri = f'{connection_object.base_url}/{Task.resourceUrl}/{id}' + response = self._connection_object.api_call(uri, 'PATCH', data) + self.__update_object__(response) + return self diff --git a/src/xurrent/workflows.py b/src/xurrent/workflows.py index c349eeb..004aebd 100644 --- a/src/xurrent/workflows.py +++ b/src/xurrent/workflows.py @@ -1,3 +1,4 @@ +from __future__ import annotations # Needed for forward references from datetime import datetime from typing import Optional, List, Dict from .core import XurrentApiHelper @@ -22,6 +23,15 @@ class WorkflowStatus(str, Enum): # approval = "approval" # Approval — deprecated: replaced by in_progress # implementation = "implementation" # Implementation — deprecated: replaced by in_progress + # Function to check if a value is a valid enum member + @classmethod + def is_valid_workflow_status(cls, value): + try: + cls[value] + return True + except KeyError: + return False + class WorkflowPredefinedFilter(str, Enum): """ @@ -41,22 +51,19 @@ def __init__(self, id: int, subject: Optional[str] = None, status: Optional[str] = None, - created_at: Optional[datetime] = None, - updated_at: Optional[datetime] = None, + manager: Optional[Dict] = None, **kwargs): self.id = id self._connection_object = connection_object self.subject = subject self.status = status - self.created_at = created_at - self.updated_at = updated_at + self.manager = manager for key, value in kwargs.items(): setattr(self, key, value) def __str__(self) -> str: """Provide a human-readable string representation of the object.""" - return (f"Workflow(id={self.id}, subject={self.subject}, status={self.status}, " - f"category={self.category}, impact={self.impact})") + return (f"Workflow(id={self.id}, subject={self.subject}, status={self.status}, manager={self.manager}") @classmethod def from_data(cls, connection_object: XurrentApiHelper, data: dict): @@ -75,25 +82,40 @@ def get_by_id(cls, connection_object: XurrentApiHelper, id: int) -> dict: return cls.from_data(connection_object, connection_object.api_call(uri, 'GET')) @classmethod - def get_workflow_tasks_by_workflow_id(cls, connection_object: XurrentApiHelper, id: int) -> List[dict]: + def get_workflow_tasks_by_workflow_id(cls, connection_object: XurrentApiHelper, id: int, queryfilter: dict = None) -> List[Task]: """ Retrieve all tasks associated with a workflow by its ID. """ - uri = f'{connection_object.base_url}/{cls.resourceUrl}/{id}/tasks' - return connection_object.api_call(uri, 'GET') + workflow = Workflow(connection_object, id) + return workflow.get_tasks(queryfilter=queryfilter) + + def get_tasks(self, queryfilter: dict = None) -> List[Task]: + """ + Retrieve all tasks associated with the current workflow instance. + """ + uri = f'{self._connection_object.base_url}/{self.resourceUrl}/{self.id}/tasks' + if queryfilter: + uri += '?' + self._connection_object.create_filter_string(queryfilter) + response = self._connection_object.api_call(uri, 'GET') + from .tasks import Task + return [Task.from_data(self._connection_object, task) for task in response] @classmethod - def get_workflow_task_by_template_id(cls, connection_object: XurrentApiHelper, workflowID: int, templateID: int) -> dict: + def get_workflow_task_by_template_id(cls, connection_object: XurrentApiHelper, workflowID: int, templateID: int) -> List[Task]: """ Retrieve a specific task associated with a workflow by template ID. """ - uri = f'{connection_object.base_url}/{cls.resourceUrl}/{workflowID}/tasks?template={templateID}' - tasks = connection_object.api_call(uri, 'GET') - if not tasks: - return None - if len(tasks) > 1: - raise Exception(f"Multiple tasks found for templateID: {templateID}") - return tasks[0] + workflow = Workflow(connection_object, workflowID) + return workflow.get_task_by_template_id(templateID) + + def get_task_by_template_id(self, templateID: int) -> List[Task]: + """ + Retrieve a specific task associated with the current workflow by template ID. + """ + return self.get_tasks(queryfilter={ + 'template': templateID + }) + def update(self, data: dict): """ @@ -102,6 +124,8 @@ def update(self, data: dict): if not self.id: raise ValueError("Workflow instance must have an ID to update.") uri = f'{self._connection_object.base_url}/{self.resourceUrl}/{self.id}' + if not WorkflowStatus.is_valid_workflow_status(data.get('status')): + raise ValueError(f"Invalid status: {data.get('status')}") response = self._connection_object.api_call(uri, 'PATCH', data) self.__update_object__(response) return self