|
| 1 | +import pytest |
| 2 | + |
| 3 | +from mpt_api_client.exceptions import MPTAPIError |
| 4 | +from mpt_api_client.rql.query_builder import RQLQuery |
| 5 | + |
| 6 | +pytestmark = [pytest.mark.flaky] |
| 7 | + |
| 8 | + |
| 9 | +@pytest.fixture |
| 10 | +async def created_journal_attachment( |
| 11 | + async_mpt_ops, journal_attachment_factory, billing_journal_id, pdf_fd |
| 12 | +): |
| 13 | + new_journal_attachment_request_data = journal_attachment_factory( |
| 14 | + name="E2E Created Journal Attachment", |
| 15 | + ) |
| 16 | + journal_attachments = async_mpt_ops.billing.journals.attachments(billing_journal_id) |
| 17 | + |
| 18 | + created_journal = await journal_attachments.create( |
| 19 | + new_journal_attachment_request_data, file=pdf_fd |
| 20 | + ) |
| 21 | + |
| 22 | + yield created_journal |
| 23 | + |
| 24 | + try: |
| 25 | + await journal_attachments.delete(created_journal.id) |
| 26 | + except MPTAPIError as error: |
| 27 | + print("TEARDOWN - Unable to delete journal attachment: %s", error.title) # noqa: WPS421 |
| 28 | + |
| 29 | + |
| 30 | +@pytest.fixture |
| 31 | +def journal_attachments(async_mpt_ops, billing_journal_id): |
| 32 | + return async_mpt_ops.billing.journals.attachments(billing_journal_id) |
| 33 | + |
| 34 | + |
| 35 | +async def test_get_journal_attachment_by_id(journal_attachments, journal_attachment_id): |
| 36 | + result = await journal_attachments.get(journal_attachment_id) |
| 37 | + |
| 38 | + assert result is not None |
| 39 | + |
| 40 | + |
| 41 | +async def test_get_journal_attachment_by_id_not_found( |
| 42 | + journal_attachments, invalid_journal_attachment_id |
| 43 | +): |
| 44 | + with pytest.raises(MPTAPIError, match=r"404 Not Found"): |
| 45 | + await journal_attachments.get(invalid_journal_attachment_id) |
| 46 | + |
| 47 | + |
| 48 | +async def test_list_journal_attachments(journal_attachments): |
| 49 | + limit = 10 |
| 50 | + |
| 51 | + result = await journal_attachments.fetch_page(limit=limit) |
| 52 | + |
| 53 | + assert len(result) > 0 |
| 54 | + |
| 55 | + |
| 56 | +async def test_filter_journal_attachments(journal_attachments, journal_attachment_id): |
| 57 | + select_fields = ["-description"] |
| 58 | + filtered_attachments = ( |
| 59 | + journal_attachments.filter(RQLQuery(id=journal_attachment_id)) |
| 60 | + .filter(RQLQuery(name="E2E Seeded Billing Journal Attachment")) |
| 61 | + .select(*select_fields) |
| 62 | + ) |
| 63 | + |
| 64 | + result = [filtered_attachment async for filtered_attachment in filtered_attachments.iterate()] |
| 65 | + |
| 66 | + assert len(result) == 1 |
| 67 | + |
| 68 | + |
| 69 | +def test_create_billing_journal_attachment(created_journal_attachment): |
| 70 | + result = created_journal_attachment |
| 71 | + |
| 72 | + assert result is not None |
| 73 | + |
| 74 | + |
| 75 | +async def test_update_journal_attachment(journal_attachments, created_journal_attachment): |
| 76 | + updated_name = "E2E Updated Journal Attachment Name" |
| 77 | + updated_attachment_data = { |
| 78 | + "name": updated_name, |
| 79 | + "description": updated_name, |
| 80 | + } |
| 81 | + |
| 82 | + result = await journal_attachments.update( |
| 83 | + created_journal_attachment.id, updated_attachment_data |
| 84 | + ) |
| 85 | + |
| 86 | + assert result is not None |
| 87 | + |
| 88 | + |
| 89 | +async def test_delete_journal_attachment(journal_attachments, created_journal_attachment): |
| 90 | + result = created_journal_attachment |
| 91 | + |
| 92 | + await journal_attachments.delete(result.id) |
| 93 | + |
| 94 | + |
| 95 | +async def test_download_journal_attachment(journal_attachments, journal_attachment_id): |
| 96 | + result = await journal_attachments.download(journal_attachment_id) |
| 97 | + |
| 98 | + assert result.file_contents is not None |
| 99 | + assert result.filename is not None |
0 commit comments