Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 32 additions & 7 deletions ChangeLog.md
Original file line number Diff line number Diff line change
@@ -1,22 +1,47 @@
# v0.0.2.1
# Change Log

## v0.0.2.2

### New Features

- Task: add base functionality for tasks
- Workflow: add methods: get_task_by_template_id, get_tasks
- Workflow: update: add check for possible status values
- Request: add static method: update_by_id

### Bug Fixes

- Workflow: Fix toString / __str__ method

### Breaking Changes

- Request: renamed get_request to get_requests
- Workflow: get_workflow_task_by_template_id now returns a Task object List


## v0.0.2.1

### New Features

## New Features
- Workflow: add base functionality for workflows
- People: add base functionality for people
- core: automatically get api user person object (optional, default: True)

# v0.0.2
## v0.0.2

### New Features

## New Features
- Request: add methods: archive, trash, restore

## Bug Fixes
### Bug Fixes

- Request: Fix get_request, get_notes method


# v0.0.1
## v0.0.1

### New Features

## New Features
- Pagination: auto pagination get requests
- Retry-after: auto retry after 429 status code
- custom fields conversion (from and to object/dict)
17 changes: 17 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,23 @@ This module is used to interact with the Xurrent API. It provides a set of class

```

### Workflows

```python

from xurrent.workflows import Workflow

workflow = Workflow.get_by_id(x_api_helper, <id>)

#close
workflow.close() # completion reason: completed, note: closed
# close with completion reason
workflow.close(completion_reason="withdrawn")
#close with completion reason and note
workflow.close(completion_reason="withdrawn", note="This is a test note")

```

### Tasks

```python
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "xurrent"
version = "0.0.2.1"
version = "0.0.2.2"
authors = [
{ name="Fabian Steiner", email="fabian@stei-ner.net" },
]
Expand Down
5 changes: 1 addition & 4 deletions src/xurrent/people.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,12 +63,9 @@ def disable(self, prefix: str = '', postfix: str = ''):
:param postfix: Postfix to add to the name
:return: Updated person object
"""
uri = f'{self._connection_object.base_url}/{self.resourceUrl}/{self.id}'
response = self._connection_object.api_call(uri, 'PATCH', {
return self.update({
'disabled': 'true',
"name": f"{prefix}{self.name}{postfix}"
})
self.__update_object__(response)
return self


14 changes: 13 additions & 1 deletion src/xurrent/requests.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ def get_by_id(cls, connection_object: XurrentApiHelper, id: int) -> T:
return cls.from_data(connection_object, response)

@classmethod
def get_request(cls, connection_object: XurrentApiHelper, predefinedFiler: PredefinedFilter = None,queryfilter: dict = None) -> List[T]:
def get_requests(cls, connection_object: XurrentApiHelper, predefinedFiler: PredefinedFilter = None,queryfilter: dict = None) -> List[T]:
"""
Retrieve a request by its ID.
:param connection_object: Instance of XurrentApiHelper
Expand Down Expand Up @@ -178,6 +178,18 @@ def update(self, data: dict):
self.__update_object__(response)
return self

@staticmethod
def update_by_id(connection_object: XurrentApiHelper, id: int, data: dict) -> T:
"""
Update a request by its ID.
:param connection_object: Instance of XurrentApiHelper
:param id: ID of the request to update
:param data: Dictionary containing updated data
:return: Response from the API call
"""
request = Request(connection_object, id)
return request.update(data)

def close(self, note: str, completion_reason: CompletionReason = CompletionReason.solved):
"""
Close the current request instance.
Expand Down
75 changes: 67 additions & 8 deletions src/xurrent/tasks.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,83 @@
from .core import XurrentApiHelper
from .workflows import Workflow
from enum import Enum
from typing import Optional, List, Dict, Type, TypeVar


T = TypeVar('T', bound='Task')

class TaskPredefinedFilter(str, Enum):
finished = "finished" # List all finished tasks
open = "open" # List all open tasks
managed_by_me = "managed_by_me" # List all tasks that are part of a workflow which manager is the API user
assigned_to_my_team = "assigned_to_my_team" # List all tasks that are assigned to one of the teams that the API user is a member of
assigned_to_me = "assigned_to_me" # List all tasks that are assigned to the API user
approval_by_me = "approval_by_me" # List all approval tasks that are assigned to the API user and which status is different from ‘Registered’





class Task():
#https://developer.4me.com/v1/tasks/
resourceUrl = 'tasks'

@staticmethod
def get_by_id(connection_object: XurrentApiHelper, id):
def __init__(self, connection_object: XurrentApiHelper, id, subject: str = None, workflow: dict = None,description: str = None, **kwargs):
self._connection_object = connection_object
self.id = id
self.subject = subject
self.workflow = workflow
for key, value in kwargs.items():
setattr(self, key, value)

def __update_object__(self, data) -> None:
if data.get('id') != self.id:
raise ValueError(f"ID mismatch: {self.id} != {data.get('id')}")
for key, value in data.items():
setattr(self, key, value)

def __str__(self) -> str:
"""
Return a string representation of the object.
"""
return f"Task(id={self.id}, subject={self.subject}, workflow={self.workflow})"

@classmethod
def from_data(cls, connection_object: XurrentApiHelper, data) -> T:
if not isinstance(data, dict):
raise TypeError(f"Expected 'data' to be a dictionary, got {type(data).__name__}")
if 'id' not in data:
raise ValueError("Data dictionary must contain an 'id' field.")
return cls(connection_object, **data)

@classmethod
def get_by_id(cls, connection_object: XurrentApiHelper, id) -> T:
uri = f'{connection_object.base_url}/{Task.resourceUrl}/{id}'
return cls.from_data(connection_object, connection_object.api_call(uri, 'GET'))

@classmethod
def get_tasks(cls, connection_object: XurrentApiHelper, predefinedFilter: TaskPredefinedFilter = None, queryfilter: dict = None) -> List[T]:
uri = f'{connection_object.base_url}/{cls.resourceUrl}'
if predefinedFilter:
uri = f'{uri}/{predefinedFilter}'
if queryfilter:
uri = f'{uri}?{queryfilter}'
return connection_object.api_call(uri, 'GET')

def get_workflow_of_task(connection_object: XurrentApiHelper, id, expand: bool = False):
@staticmethod
def get_workflow_of_task(connection_object: XurrentApiHelper, id, expand: bool = False) -> Workflow:
task = Task.get_by_id(connection_object, id)
if expand:
return Workflow.get_by_id(connection_object, task['workflow'].id)
return task['workflow']
return Workflow.get_by_id(connection_object, task.workflow.id)
return Workflow.from_data(connection_object, task.workflow)

@staticmethod
def update(connection_object: XurrentApiHelper, id, data):
uri = f'{connection_object.base_url}/{Task.resourceUrl}/{id}'
return connection_object.api_call(uri, 'PATCH', data)
def update_by_id(connection_object: XurrentApiHelper, id, data) -> T:
task = Task(connection_object=connection_object, id=id)
return task.update(data)

def update(self, data) -> T:
uri = f'{connection_object.base_url}/{Task.resourceUrl}/{id}'
response = self._connection_object.api_call(uri, 'PATCH', data)
self.__update_object__(response)
return self
58 changes: 41 additions & 17 deletions src/xurrent/workflows.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from __future__ import annotations # Needed for forward references
from datetime import datetime
from typing import Optional, List, Dict
from .core import XurrentApiHelper
Expand All @@ -22,6 +23,15 @@ class WorkflowStatus(str, Enum):
# approval = "approval" # Approval — deprecated: replaced by in_progress
# implementation = "implementation" # Implementation — deprecated: replaced by in_progress

# Function to check if a value is a valid enum member
@classmethod
def is_valid_workflow_status(cls, value):
try:
cls[value]
return True
except KeyError:
return False


class WorkflowPredefinedFilter(str, Enum):
"""
Expand All @@ -41,22 +51,19 @@ def __init__(self,
id: int,
subject: Optional[str] = None,
status: Optional[str] = None,
created_at: Optional[datetime] = None,
updated_at: Optional[datetime] = None,
manager: Optional[Dict] = None,
**kwargs):
self.id = id
self._connection_object = connection_object
self.subject = subject
self.status = status
self.created_at = created_at
self.updated_at = updated_at
self.manager = manager
for key, value in kwargs.items():
setattr(self, key, value)

def __str__(self) -> str:
"""Provide a human-readable string representation of the object."""
return (f"Workflow(id={self.id}, subject={self.subject}, status={self.status}, "
f"category={self.category}, impact={self.impact})")
return (f"Workflow(id={self.id}, subject={self.subject}, status={self.status}, manager={self.manager}")

@classmethod
def from_data(cls, connection_object: XurrentApiHelper, data: dict):
Expand All @@ -75,25 +82,40 @@ def get_by_id(cls, connection_object: XurrentApiHelper, id: int) -> dict:
return cls.from_data(connection_object, connection_object.api_call(uri, 'GET'))

@classmethod
def get_workflow_tasks_by_workflow_id(cls, connection_object: XurrentApiHelper, id: int) -> List[dict]:
def get_workflow_tasks_by_workflow_id(cls, connection_object: XurrentApiHelper, id: int, queryfilter: dict = None) -> List[Task]:
"""
Retrieve all tasks associated with a workflow by its ID.
"""
uri = f'{connection_object.base_url}/{cls.resourceUrl}/{id}/tasks'
return connection_object.api_call(uri, 'GET')
workflow = Workflow(connection_object, id)
return workflow.get_tasks(queryfilter=queryfilter)

def get_tasks(self, queryfilter: dict = None) -> List[Task]:
"""
Retrieve all tasks associated with the current workflow instance.
"""
uri = f'{self._connection_object.base_url}/{self.resourceUrl}/{self.id}/tasks'
if queryfilter:
uri += '?' + self._connection_object.create_filter_string(queryfilter)
response = self._connection_object.api_call(uri, 'GET')
from .tasks import Task
return [Task.from_data(self._connection_object, task) for task in response]

@classmethod
def get_workflow_task_by_template_id(cls, connection_object: XurrentApiHelper, workflowID: int, templateID: int) -> dict:
def get_workflow_task_by_template_id(cls, connection_object: XurrentApiHelper, workflowID: int, templateID: int) -> List[Task]:
"""
Retrieve a specific task associated with a workflow by template ID.
"""
uri = f'{connection_object.base_url}/{cls.resourceUrl}/{workflowID}/tasks?template={templateID}'
tasks = connection_object.api_call(uri, 'GET')
if not tasks:
return None
if len(tasks) > 1:
raise Exception(f"Multiple tasks found for templateID: {templateID}")
return tasks[0]
workflow = Workflow(connection_object, workflowID)
return workflow.get_task_by_template_id(templateID)

def get_task_by_template_id(self, templateID: int) -> List[Task]:
"""
Retrieve a specific task associated with the current workflow by template ID.
"""
return self.get_tasks(queryfilter={
'template': templateID
})


def update(self, data: dict):
"""
Expand All @@ -102,6 +124,8 @@ def update(self, data: dict):
if not self.id:
raise ValueError("Workflow instance must have an ID to update.")
uri = f'{self._connection_object.base_url}/{self.resourceUrl}/{self.id}'
if not WorkflowStatus.is_valid_workflow_status(data.get('status')):
raise ValueError(f"Invalid status: {data.get('status')}")
response = self._connection_object.api_call(uri, 'PATCH', data)
self.__update_object__(response)
return self
Expand Down
Loading