Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions ChangeLog.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
# v0.0.2.1

## New Features
- Workflow: add base functionality for workflows
- People: add base functionality for people
- core: automatically get api user person object (optional, default: True)

# v0.0.2

## New Features
Expand Down
7 changes: 7 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@

This module is used to interact with the Xurrent API. It provides a set of classes to interact with the API.

## Change Log

[ChangeLog.md](https://github.com/fasteiner/xurrent-python/blob/main/ChangeLog.md)

## Usage

Expand All @@ -19,6 +22,10 @@ This module is used to interact with the Xurrent API. It provides a set of class

x_api_helper = XurrentApiHelper(baseUrl, apitoken, account)

# Plain API Call
uri = "/requests?subject=Example Subject"
connection_object.api_call(uri, 'GET')

```

### Requests
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "xurrent"
version = "0.0.2"
version = "0.0.2.1"
authors = [
{ name="Fabian Steiner", email="fabian@stei-ner.net" },
]
Expand Down
16 changes: 11 additions & 5 deletions src/xurrent/core.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from __future__ import annotations # Needed for forward references
import logging
import requests
import json
Expand Down Expand Up @@ -26,20 +27,25 @@ def to_json(self):


class XurrentApiHelper:
api_user: Person # Forward declaration with a string

def __init__(self, base_url, api_key, api_account):
def __init__(self, base_url, api_key, api_account, resolve_user=True):
self.base_url = base_url
self.api_key = api_key
self.api_account = api_account
self.logger = logging.getLogger(__name__)
if resolve_user:
# Import Person lazily
from .people import Person
self.api_user = Person.get_me(self)

def __append_per_page(self, uri, per_page=100):
"""
Append the 'per_page' parameter to the URI if not already present.
:param uri: URI to append the parameter to
:param per_page: Number of records per page
:return: URI with the 'per_page' parameter appended
>>> helper = XurrentApiHelper('https://api.example.com', 'api_key', 'account')
>>> helper = XurrentApiHelper('https://api.example.com', 'api_key', 'account', False)
>>> helper._XurrentApiHelper__append_per_page('https://api.example.com/tasks')
'https://api.example.com/tasks?per_page=100'
>>> helper._XurrentApiHelper__append_per_page('https://api.example.com/tasks?status=open')
Expand Down Expand Up @@ -138,7 +144,7 @@ def custom_fields_to_object(self, custom_fields):
:param custom_fields: List of custom fields
:return: Dictionary containing the custom fields

>>> helper = XurrentApiHelper('https://api.example.com', 'api_key', 'account')
>>> helper = XurrentApiHelper('https://api.example.com', 'api_key', 'account', False)
>>> helper.custom_fields_to_object([{'id': 'priority', 'value': 'high'}, {'id': 'status', 'value': 'open'}])
{'priority': 'high', 'status': 'open'}
"""
Expand All @@ -153,7 +159,7 @@ def object_to_custom_fields(self, obj):
:param obj: Dictionary to convert
:return: List of custom fields

>>> helper = XurrentApiHelper('https://api.example.com', 'api_key', 'account')
>>> helper = XurrentApiHelper('https://api.example.com', 'api_key', 'account', False)
>>> helper.object_to_custom_fields({'priority': 'high', 'status': 'open'})
[{'id': 'priority', 'value': 'high'}, {'id': 'status', 'value': 'open'}]
"""
Expand All @@ -167,7 +173,7 @@ def create_filter_string(self, filter: dict):
Create a filter string from a dictionary.
:param filter: Dictionary containing the filter parameters
:return: String containing the filter parameters
>>> helper = XurrentApiHelper('https://api.example.com', 'api_key', 'account')
>>> helper = XurrentApiHelper('https://api.example.com', 'api_key', 'account', False)
>>> helper.create_filter_string({'status': 'open', 'priority': 'high'})
'status=open&priority=high'
>>> helper.create_filter_string({'status': 'open'})
Expand Down
74 changes: 74 additions & 0 deletions src/xurrent/people.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
from .core import XurrentApiHelper
from typing import Optional, List, Dict, Type, TypeVar


T = TypeVar('T', bound='Person')

class Person():
#https://developer.4me.com/v1/people/
resourceUrl = 'people'

def __init__(self, connection_object: XurrentApiHelper, id, name: str = None, primary_email: str = None,**kwargs):
self._connection_object = connection_object
self.id = id
self.name = name
self.primary_email = primary_email
for key, value in kwargs.items():
setattr(self, key, value)

def __update_object__(self, data) -> None:
if data.get('id') != self.id:
raise ValueError(f"ID mismatch: {self.id} != {data.get('id')}")
for key, value in data.items():
setattr(self, key, value)

def __str__(self) -> str:
"""
Return a string representation of the object.
"""
return f"Person(id={self.id}, name={self.name}, primary_email={self.primary_email})"

@classmethod
def from_data(cls, connection_object: XurrentApiHelper, data) -> T:
if not isinstance(data, dict):
raise TypeError(f"Expected 'data' to be a dictionary, got {type(data).__name__}")
if 'id' not in data:
raise ValueError("Data dictionary must contain an 'id' field.")
return cls(connection_object, **data)

@classmethod
def get_by_id(cls, connection_object: XurrentApiHelper, id):
uri = f'{connection_object.base_url}/{cls.resourceUrl}/{id}'
return cls.from_data(connection_object, connection_object.api_call(uri, 'GET'))

@classmethod
def get_me(cls, connection_object: XurrentApiHelper):
"""
Retrieve the person object for the authenticated user.
"""
uri = f'{connection_object.base_url}/{cls.resourceUrl}/me'
return cls.from_data(connection_object, connection_object.api_call(uri, 'GET'))

def update(self, data):
uri = f'{self._connection_object.base_url}/{self.resourceUrl}/{self.id}'
response = self._connection_object.api_call(uri, 'PATCH', data)
self.__update_object__(response)
return self

def disable(self, prefix: str = '', postfix: str = ''):
"""
Disable the person and update the name.

:param prefix: Prefix to add to the name
:param postfix: Postfix to add to the name
:return: Updated person object
"""
uri = f'{self._connection_object.base_url}/{self.resourceUrl}/{self.id}'
response = self._connection_object.api_call(uri, 'PATCH', {
'disabled': 'true',
"name": f"{prefix}{self.name}{postfix}"
})
self.__update_object__(response)
return self


4 changes: 2 additions & 2 deletions src/xurrent/requests.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,8 @@ def __str__(self) -> str:
def from_data(cls, connection_object: XurrentApiHelper, data) -> T:
if not isinstance(data, dict):
raise TypeError(f"Expected 'data' to be a dictionary, got {type(data).__name__}")
if not 'id' in data:
data['id'] = id
if 'id' not in data:
raise ValueError("Data dictionary must contain an 'id' field.")
return cls(connection_object, **data)


Expand Down
4 changes: 0 additions & 4 deletions src/xurrent/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,3 @@ def update(connection_object: XurrentApiHelper, id, data):
uri = f'{connection_object.base_url}/{Task.resourceUrl}/{id}'
return connection_object.api_call(uri, 'PATCH', data)

@staticmethod
def delete(connection_object: XurrentApiHelper, id):
uri = f'{connection_object.base_url}/{Task.resourceUrl}/{id}'
return connection_object.api_call(uri, 'DELETE')
186 changes: 166 additions & 20 deletions src/xurrent/workflows.py
Original file line number Diff line number Diff line change
@@ -1,36 +1,182 @@
from datetime import datetime
from typing import Optional, List, Dict
from .core import XurrentApiHelper
from enum import Enum

class WorkflowCompletionReason(str, Enum):
withdrawn = "withdrawn" # Withdrawn - Withdrawn by Requester
rejected = "rejected" # Rejected - Rejected by Approver
rolled_back = "rolled_back" # Rolled Back - Original Environment Restored
failed = "failed" # Failed - No Requirements Met
partial = "partial" # Partial - Not All Requirements Met
disruptive = "disruptive" # Disruptive - Caused Service Disruption
complete = "complete" # Complete - All Requirements Met

class Workflow():
#https://developer.4me.com/v1/workflows/
class WorkflowStatus(str, Enum):
being_created = "being_created" # Being Created
registered = "registered" # Registered
in_progress = "in_progress" # In Progress
progress_halted = "progress_halted" # Progress Halted
completed = "completed" # Completed
# risk_and_impact = "risk_and_impact" # Risk & Impact — deprecated: replaced by in_progress
# approval = "approval" # Approval — deprecated: replaced by in_progress
# implementation = "implementation" # Implementation — deprecated: replaced by in_progress


class WorkflowPredefinedFilter(str, Enum):
"""
Predefined filters for tasks.
"""
open = 'open' # /workflows/completed: List all completed workflows
completed = 'completed' #/workflows/open: List all open workflows
managed_by_me = 'managed_by_me' #/workflows/managed_by_me: List all workflows which manager is the API user


class Workflow:
# Endpoint for workflows
resourceUrl = 'workflows'

@staticmethod
def get_by_id(connection_object: XurrentApiHelper, id):
uri = f'{connection_object.base_url}/{Workflow.resourceUrl}/{id}'
def __init__(self,
connection_object: XurrentApiHelper,
id: int,
subject: Optional[str] = None,
status: Optional[str] = None,
created_at: Optional[datetime] = None,
updated_at: Optional[datetime] = None,
**kwargs):
self.id = id
self._connection_object = connection_object
self.subject = subject
self.status = status
self.created_at = created_at
self.updated_at = updated_at
for key, value in kwargs.items():
setattr(self, key, value)

def __str__(self) -> str:
"""Provide a human-readable string representation of the object."""
return (f"Workflow(id={self.id}, subject={self.subject}, status={self.status}, "
f"category={self.category}, impact={self.impact})")

@classmethod
def from_data(cls, connection_object: XurrentApiHelper, data: dict):
if not isinstance(data, dict):
raise TypeError(f"Expected 'data' to be a dictionary, got {type(data).__name__}")
if 'id' not in data:
raise ValueError("Data dictionary must contain an 'id' field.")
return cls(connection_object, **data)

@classmethod
def get_by_id(cls, connection_object: XurrentApiHelper, id: int) -> dict:
"""
Retrieve a workflow by its ID.
"""
uri = f'{connection_object.base_url}/{cls.resourceUrl}/{id}'
return cls.from_data(connection_object, connection_object.api_call(uri, 'GET'))

@classmethod
def get_workflow_tasks_by_workflow_id(cls, connection_object: XurrentApiHelper, id: int) -> List[dict]:
"""
Retrieve all tasks associated with a workflow by its ID.
"""
uri = f'{connection_object.base_url}/{cls.resourceUrl}/{id}/tasks'
return connection_object.api_call(uri, 'GET')

@staticmethod
def get_workflow_task_by_template_id(connection_object: XurrentApiHelper, workflowID: int, templateID: int):
uri = f'{connection_object.base_url}/{Workflow.resourceUrl}/{workflowID}/tasks?template={templateID}'
@classmethod
def get_workflow_task_by_template_id(cls, connection_object: XurrentApiHelper, workflowID: int, templateID: int) -> dict:
"""
Retrieve a specific task associated with a workflow by template ID.
"""
uri = f'{connection_object.base_url}/{cls.resourceUrl}/{workflowID}/tasks?template={templateID}'
tasks = connection_object.api_call(uri, 'GET')
if len(tasks) == 0:
if not tasks:
return None
if len(tasks) > 1:
raise Exception(f"Multiple tasks found for templateID: {templateID}")
return tasks[0]

@staticmethod
def get_workflow_tasks_by_workflow_id(connection_object: XurrentApiHelper, id: int):
uri = f'{connection_object.base_url}/{Workflow.resourceUrl}/{id}/tasks'
return connection_object.api_call(uri, 'GET')
def update(self, data: dict):
"""
Update the current workflow instance with new data.
"""
if not self.id:
raise ValueError("Workflow instance must have an ID to update.")
uri = f'{self._connection_object.base_url}/{self.resourceUrl}/{self.id}'
response = self._connection_object.api_call(uri, 'PATCH', data)
self.__update_object__(response)
return self

@staticmethod
def update(connection_object: XurrentApiHelper, id, data):
uri = f'{connection_object.base_url}/{Workflow.resourceUrl}/{id}'
return connection_object.api_call(uri, 'PATCH', data)
def update_by_id(connection_object: XurrentApiHelper, id: int, data: dict) -> dict:
"""
Update a workflow by its ID.
"""
workflow = Workflow(connection_object, id)
return workflow.update(data)

@staticmethod
def delete(connection_object: XurrentApiHelper, id):
uri = f'{connection_object.base_url}/{Workflow.resourceUrl}/{id}'
return connection_object.api_call(uri, 'DELETE')
@classmethod
def create(cls, connection_object: XurrentApiHelper, data: dict):
"""
Create a new workflow.
"""
uri = f'{connection_object.base_url}/{cls.resourceUrl}'
response = connection_object.api_call(uri, 'POST', data)
return cls.from_data(connection_object, response)

def close(self, note="closed.", completion_reason=WorkflowCompletionReason.complete):
"""
Close the current workflow instance.
"""
if not self.id:
raise ValueError("Workflow instance must have an ID to close.")
uri = f'{self._connection_object.base_url}/{self.resourceUrl}/{self.id}'
response = self._connection_object.api_call(uri, 'PATCH', {
'note': note,
'manager_id': self._connection_object.api_user.id,
'status': WorkflowStatus.completed,
'completion_reason': completion_reason
})
self.__update_object__(response)
return self

def archive(self):
"""
Archive the current workflow instance.
"""
if not self.id:
raise ValueError("Workflow instance must have an ID to archive.")
uri = f'{self._connection_object.base_url}/{self.resourceUrl}/{self.id}/archive'
response = self._connection_object.api_call(uri, 'POST')
self.__update_object__(response)
return self

def trash(self):
"""
Trash the current workflow instance.
"""
if not self.id:
raise ValueError("Workflow instance must have an ID to trash.")
uri = f'{self._connection_object.base_url}/{self.resourceUrl}/{self.id}/trash'
response = self._connection_object.api_call(uri, 'POST')
self.__update_object__(response)
return self

def restore(self):
"""
Restore the current workflow instance.
"""
if not self.id:
raise ValueError("Workflow instance must have an ID to restore.")
uri = f'{self._connection_object.base_url}/{self.resourceUrl}/{self.id}/restore'
response = self._connection_object.api_call(uri, 'POST')
self.__update_object__(response)
return self

def __update_object__(self, data):
"""
Update the instance properties with new data.
"""
if data.get('id') != self.id:
raise ValueError(f"ID mismatch: {self.id} != {data.get('id')}")
for key, value in data.items():
setattr(self, key, value)
Loading