From 6c14f3afac0ea28dba24fe8783e7c1f09c03b31f Mon Sep 17 00:00:00 2001 From: Alexander Piskun <13381981+bigcat88@users.noreply.github.com> Date: Wed, 29 Oct 2025 20:14:56 +0200 Subject: [PATCH 1/5] use new API client in Luma and Minimax nodes (#10528) --- comfy_api_nodes/apinode_utils.py | 81 ------ comfy_api_nodes/apis/minimax_api.py | 120 ++++++++ comfy_api_nodes/nodes_ideogram.py | 2 +- comfy_api_nodes/nodes_luma.py | 356 ++++++----------------- comfy_api_nodes/nodes_minimax.py | 241 +++++---------- comfy_api_nodes/util/client.py | 2 +- comfy_api_nodes/util/download_helpers.py | 3 +- 7 files changed, 286 insertions(+), 519 deletions(-) create mode 100644 comfy_api_nodes/apis/minimax_api.py diff --git a/comfy_api_nodes/apinode_utils.py b/comfy_api_nodes/apinode_utils.py index 4182c8f80b7f..6a72b9d1d759 100644 --- a/comfy_api_nodes/apinode_utils.py +++ b/comfy_api_nodes/apinode_utils.py @@ -3,14 +3,6 @@ import mimetypes from typing import Optional, Union from comfy.utils import common_upscale -from comfy_api_nodes.apis.client import ( - ApiClient, - ApiEndpoint, - HttpMethod, - SynchronousOperation, - UploadRequest, - UploadResponse, -) from server import PromptServer from comfy.cli_args import args @@ -19,7 +11,6 @@ import torch import math import base64 -from .util import tensor_to_bytesio, bytesio_to_image_tensor from io import BytesIO @@ -148,11 +139,6 @@ async def download_url_to_bytesio( return BytesIO(await resp.read()) -def process_image_response(response_content: bytes | str) -> torch.Tensor: - """Uses content from a Response object and converts it to a torch.Tensor""" - return bytesio_to_image_tensor(BytesIO(response_content)) - - def text_filepath_to_base64_string(filepath: str) -> str: """Converts a text file to a base64 string.""" with open(filepath, "rb") as f: @@ -169,73 +155,6 @@ def text_filepath_to_data_uri(filepath: str) -> str: return f"data:{mime_type};base64,{base64_string}" -async def upload_file_to_comfyapi( - file_bytes_io: BytesIO, - filename: str, - upload_mime_type: Optional[str], - auth_kwargs: Optional[dict[str, str]] = None, -) -> str: - """ - Uploads a single file to ComfyUI API and returns its download URL. - - Args: - file_bytes_io: BytesIO object containing the file data. - filename: The filename of the file. - upload_mime_type: MIME type of the file. - auth_kwargs: Optional authentication token(s). - - Returns: - The download URL for the uploaded file. - """ - if upload_mime_type is None: - request_object = UploadRequest(file_name=filename) - else: - request_object = UploadRequest(file_name=filename, content_type=upload_mime_type) - operation = SynchronousOperation( - endpoint=ApiEndpoint( - path="/customers/storage", - method=HttpMethod.POST, - request_model=UploadRequest, - response_model=UploadResponse, - ), - request=request_object, - auth_kwargs=auth_kwargs, - ) - - response: UploadResponse = await operation.execute() - await ApiClient.upload_file(response.upload_url, file_bytes_io, content_type=upload_mime_type) - return response.download_url - - -async def upload_images_to_comfyapi( - image: torch.Tensor, - max_images=8, - auth_kwargs: Optional[dict[str, str]] = None, - mime_type: Optional[str] = None, -) -> list[str]: - """ - Uploads images to ComfyUI API and returns download URLs. - To upload multiple images, stack them in the batch dimension first. - - Args: - image: Input torch.Tensor image. - max_images: Maximum number of images to upload. - auth_kwargs: Optional authentication token(s). - mime_type: Optional MIME type for the image. - """ - # if batch, try to upload each file if max_images is greater than 0 - download_urls: list[str] = [] - is_batch = len(image.shape) > 3 - batch_len = image.shape[0] if is_batch else 1 - - for idx in range(min(batch_len, max_images)): - tensor = image[idx] if is_batch else image - img_io = tensor_to_bytesio(tensor, mime_type=mime_type) - url = await upload_file_to_comfyapi(img_io, img_io.name, mime_type, auth_kwargs) - download_urls.append(url) - return download_urls - - def resize_mask_to_image( mask: torch.Tensor, image: torch.Tensor, diff --git a/comfy_api_nodes/apis/minimax_api.py b/comfy_api_nodes/apis/minimax_api.py new file mode 100644 index 000000000000..d747e177a165 --- /dev/null +++ b/comfy_api_nodes/apis/minimax_api.py @@ -0,0 +1,120 @@ +from enum import Enum +from typing import Optional + +from pydantic import BaseModel, Field + + +class MinimaxBaseResponse(BaseModel): + status_code: int = Field( + ..., + description='Status code. 0 indicates success, other values indicate errors.', + ) + status_msg: str = Field( + ..., description='Specific error details or success message.' + ) + + +class File(BaseModel): + bytes: Optional[int] = Field(None, description='File size in bytes') + created_at: Optional[int] = Field( + None, description='Unix timestamp when the file was created, in seconds' + ) + download_url: Optional[str] = Field( + None, description='The URL to download the video' + ) + backup_download_url: Optional[str] = Field( + None, description='The backup URL to download the video' + ) + + file_id: Optional[int] = Field(None, description='Unique identifier for the file') + filename: Optional[str] = Field(None, description='The name of the file') + purpose: Optional[str] = Field(None, description='The purpose of using the file') + + +class MinimaxFileRetrieveResponse(BaseModel): + base_resp: MinimaxBaseResponse + file: File + + +class MiniMaxModel(str, Enum): + T2V_01_Director = 'T2V-01-Director' + I2V_01_Director = 'I2V-01-Director' + S2V_01 = 'S2V-01' + I2V_01 = 'I2V-01' + I2V_01_live = 'I2V-01-live' + T2V_01 = 'T2V-01' + Hailuo_02 = 'MiniMax-Hailuo-02' + + +class Status6(str, Enum): + Queueing = 'Queueing' + Preparing = 'Preparing' + Processing = 'Processing' + Success = 'Success' + Fail = 'Fail' + + +class MinimaxTaskResultResponse(BaseModel): + base_resp: MinimaxBaseResponse + file_id: Optional[str] = Field( + None, + description='After the task status changes to Success, this field returns the file ID corresponding to the generated video.', + ) + status: Status6 = Field( + ..., + description="Task status: 'Queueing' (in queue), 'Preparing' (task is preparing), 'Processing' (generating), 'Success' (task completed successfully), or 'Fail' (task failed).", + ) + task_id: str = Field(..., description='The task ID being queried.') + + +class SubjectReferenceItem(BaseModel): + image: Optional[str] = Field( + None, description='URL or base64 encoding of the subject reference image.' + ) + mask: Optional[str] = Field( + None, + description='URL or base64 encoding of the mask for the subject reference image.', + ) + + +class MinimaxVideoGenerationRequest(BaseModel): + callback_url: Optional[str] = Field( + None, + description='Optional. URL to receive real-time status updates about the video generation task.', + ) + first_frame_image: Optional[str] = Field( + None, + description='URL or base64 encoding of the first frame image. Required when model is I2V-01, I2V-01-Director, or I2V-01-live.', + ) + model: MiniMaxModel = Field( + ..., + description='Required. ID of model. Options: T2V-01-Director, I2V-01-Director, S2V-01, I2V-01, I2V-01-live, T2V-01', + ) + prompt: Optional[str] = Field( + None, + description='Description of the video. Should be less than 2000 characters. Supports camera movement instructions in [brackets].', + max_length=2000, + ) + prompt_optimizer: Optional[bool] = Field( + True, + description='If true (default), the model will automatically optimize the prompt. Set to false for more precise control.', + ) + subject_reference: Optional[list[SubjectReferenceItem]] = Field( + None, + description='Only available when model is S2V-01. The model will generate a video based on the subject uploaded through this parameter.', + ) + duration: Optional[int] = Field( + None, + description="The length of the output video in seconds." + ) + resolution: Optional[str] = Field( + None, + description="The dimensions of the video display. 1080p corresponds to 1920 x 1080 pixels, 768p corresponds to 1366 x 768 pixels." + ) + + +class MinimaxVideoGenerationResponse(BaseModel): + base_resp: MinimaxBaseResponse + task_id: str = Field( + ..., description='The task ID for the asynchronous video generation task.' + ) diff --git a/comfy_api_nodes/nodes_ideogram.py b/comfy_api_nodes/nodes_ideogram.py index 9eae5f11aec4..d8fd3378bbd8 100644 --- a/comfy_api_nodes/nodes_ideogram.py +++ b/comfy_api_nodes/nodes_ideogram.py @@ -20,9 +20,9 @@ from comfy_api_nodes.apinode_utils import ( download_url_to_bytesio, - bytesio_to_image_tensor, resize_mask_to_image, ) +from comfy_api_nodes.util import bytesio_to_image_tensor from server import PromptServer V1_V1_RES_MAP = { diff --git a/comfy_api_nodes/nodes_luma.py b/comfy_api_nodes/nodes_luma.py index e74441e5ef5f..894f2b08c994 100644 --- a/comfy_api_nodes/nodes_luma.py +++ b/comfy_api_nodes/nodes_luma.py @@ -1,69 +1,51 @@ -from __future__ import annotations -from inspect import cleandoc from typing import Optional + +import torch from typing_extensions import override -from comfy_api.latest import ComfyExtension, IO -from comfy_api.input_impl.video_types import VideoFromFile + +from comfy_api.latest import IO, ComfyExtension from comfy_api_nodes.apis.luma_api import ( - LumaImageModel, - LumaVideoModel, - LumaVideoOutputResolution, - LumaVideoModelOutputDuration, LumaAspectRatio, - LumaState, - LumaImageGenerationRequest, - LumaGenerationRequest, - LumaGeneration, LumaCharacterRef, - LumaModifyImageRef, + LumaConceptChain, + LumaGeneration, + LumaGenerationRequest, + LumaImageGenerationRequest, LumaImageIdentity, - LumaReference, - LumaReferenceChain, + LumaImageModel, LumaImageReference, - LumaKeyframes, - LumaConceptChain, LumaIO, + LumaKeyframes, + LumaModifyImageRef, + LumaReference, + LumaReferenceChain, + LumaVideoModel, + LumaVideoModelOutputDuration, + LumaVideoOutputResolution, get_luma_concepts, ) -from comfy_api_nodes.apis.client import ( +from comfy_api_nodes.util import ( ApiEndpoint, - HttpMethod, - SynchronousOperation, - PollingOperation, - EmptyRequest, -) -from comfy_api_nodes.apinode_utils import ( + download_url_to_image_tensor, + download_url_to_video_output, + poll_op, + sync_op, upload_images_to_comfyapi, - process_image_response, + validate_string, ) -from server import PromptServer -from comfy_api_nodes.util import validate_string - -import aiohttp -import torch -from io import BytesIO LUMA_T2V_AVERAGE_DURATION = 105 LUMA_I2V_AVERAGE_DURATION = 100 -def image_result_url_extractor(response: LumaGeneration): - return response.assets.image if hasattr(response, "assets") and hasattr(response.assets, "image") else None - -def video_result_url_extractor(response: LumaGeneration): - return response.assets.video if hasattr(response, "assets") and hasattr(response.assets, "video") else None class LumaReferenceNode(IO.ComfyNode): - """ - Holds an image and weight for use with Luma Generate Image node. - """ - @classmethod def define_schema(cls) -> IO.Schema: return IO.Schema( node_id="LumaReferenceNode", display_name="Luma Reference", category="api node/image/Luma", - description=cleandoc(cls.__doc__ or ""), + description="Holds an image and weight for use with Luma Generate Image node.", inputs=[ IO.Image.Input( "image", @@ -83,17 +65,10 @@ def define_schema(cls) -> IO.Schema: ), ], outputs=[IO.Custom(LumaIO.LUMA_REF).Output(display_name="luma_ref")], - hidden=[ - IO.Hidden.auth_token_comfy_org, - IO.Hidden.api_key_comfy_org, - IO.Hidden.unique_id, - ], ) @classmethod - def execute( - cls, image: torch.Tensor, weight: float, luma_ref: LumaReferenceChain = None - ) -> IO.NodeOutput: + def execute(cls, image: torch.Tensor, weight: float, luma_ref: LumaReferenceChain = None) -> IO.NodeOutput: if luma_ref is not None: luma_ref = luma_ref.clone() else: @@ -103,17 +78,13 @@ def execute( class LumaConceptsNode(IO.ComfyNode): - """ - Holds one or more Camera Concepts for use with Luma Text to Video and Luma Image to Video nodes. - """ - @classmethod def define_schema(cls) -> IO.Schema: return IO.Schema( node_id="LumaConceptsNode", display_name="Luma Concepts", category="api node/video/Luma", - description=cleandoc(cls.__doc__ or ""), + description="Camera Concepts for use with Luma Text to Video and Luma Image to Video nodes.", inputs=[ IO.Combo.Input( "concept1", @@ -138,11 +109,6 @@ def define_schema(cls) -> IO.Schema: ), ], outputs=[IO.Custom(LumaIO.LUMA_CONCEPTS).Output(display_name="luma_concepts")], - hidden=[ - IO.Hidden.auth_token_comfy_org, - IO.Hidden.api_key_comfy_org, - IO.Hidden.unique_id, - ], ) @classmethod @@ -161,17 +127,13 @@ def execute( class LumaImageGenerationNode(IO.ComfyNode): - """ - Generates images synchronously based on prompt and aspect ratio. - """ - @classmethod def define_schema(cls) -> IO.Schema: return IO.Schema( node_id="LumaImageNode", display_name="Luma Text to Image", category="api node/image/Luma", - description=cleandoc(cls.__doc__ or ""), + description="Generates images synchronously based on prompt and aspect ratio.", inputs=[ IO.String.Input( "prompt", @@ -237,45 +199,30 @@ async def execute( aspect_ratio: str, seed, style_image_weight: float, - image_luma_ref: LumaReferenceChain = None, - style_image: torch.Tensor = None, - character_image: torch.Tensor = None, + image_luma_ref: Optional[LumaReferenceChain] = None, + style_image: Optional[torch.Tensor] = None, + character_image: Optional[torch.Tensor] = None, ) -> IO.NodeOutput: validate_string(prompt, strip_whitespace=True, min_length=3) - auth_kwargs = { - "auth_token": cls.hidden.auth_token_comfy_org, - "comfy_api_key": cls.hidden.api_key_comfy_org, - } # handle image_luma_ref api_image_ref = None if image_luma_ref is not None: - api_image_ref = await cls._convert_luma_refs( - image_luma_ref, max_refs=4, auth_kwargs=auth_kwargs, - ) + api_image_ref = await cls._convert_luma_refs(image_luma_ref, max_refs=4) # handle style_luma_ref api_style_ref = None if style_image is not None: - api_style_ref = await cls._convert_style_image( - style_image, weight=style_image_weight, auth_kwargs=auth_kwargs, - ) + api_style_ref = await cls._convert_style_image(style_image, weight=style_image_weight) # handle character_ref images character_ref = None if character_image is not None: - download_urls = await upload_images_to_comfyapi( - character_image, max_images=4, auth_kwargs=auth_kwargs, - ) - character_ref = LumaCharacterRef( - identity0=LumaImageIdentity(images=download_urls) - ) - - operation = SynchronousOperation( - endpoint=ApiEndpoint( - path="/proxy/luma/generations/image", - method=HttpMethod.POST, - request_model=LumaImageGenerationRequest, - response_model=LumaGeneration, - ), - request=LumaImageGenerationRequest( + download_urls = await upload_images_to_comfyapi(cls, character_image, max_images=4) + character_ref = LumaCharacterRef(identity0=LumaImageIdentity(images=download_urls)) + + response_api = await sync_op( + cls, + ApiEndpoint(path="/proxy/luma/generations/image", method="POST"), + response_model=LumaGeneration, + data=LumaImageGenerationRequest( prompt=prompt, model=model, aspect_ratio=aspect_ratio, @@ -283,41 +230,21 @@ async def execute( style_ref=api_style_ref, character_ref=character_ref, ), - auth_kwargs=auth_kwargs, ) - response_api: LumaGeneration = await operation.execute() - - operation = PollingOperation( - poll_endpoint=ApiEndpoint( - path=f"/proxy/luma/generations/{response_api.id}", - method=HttpMethod.GET, - request_model=EmptyRequest, - response_model=LumaGeneration, - ), - completed_statuses=[LumaState.completed], - failed_statuses=[LumaState.failed], + response_poll = await poll_op( + cls, + ApiEndpoint(path=f"/proxy/luma/generations/{response_api.id}"), + response_model=LumaGeneration, status_extractor=lambda x: x.state, - result_url_extractor=image_result_url_extractor, - node_id=cls.hidden.unique_id, - auth_kwargs=auth_kwargs, ) - response_poll = await operation.execute() - - async with aiohttp.ClientSession() as session: - async with session.get(response_poll.assets.image) as img_response: - img = process_image_response(await img_response.content.read()) - return IO.NodeOutput(img) + return IO.NodeOutput(await download_url_to_image_tensor(response_poll.assets.image)) @classmethod - async def _convert_luma_refs( - cls, luma_ref: LumaReferenceChain, max_refs: int, auth_kwargs: Optional[dict[str,str]] = None - ): + async def _convert_luma_refs(cls, luma_ref: LumaReferenceChain, max_refs: int): luma_urls = [] ref_count = 0 for ref in luma_ref.refs: - download_urls = await upload_images_to_comfyapi( - ref.image, max_images=1, auth_kwargs=auth_kwargs - ) + download_urls = await upload_images_to_comfyapi(cls, ref.image, max_images=1) luma_urls.append(download_urls[0]) ref_count += 1 if ref_count >= max_refs: @@ -325,27 +252,19 @@ async def _convert_luma_refs( return luma_ref.create_api_model(download_urls=luma_urls, max_refs=max_refs) @classmethod - async def _convert_style_image( - cls, style_image: torch.Tensor, weight: float, auth_kwargs: Optional[dict[str,str]] = None - ): - chain = LumaReferenceChain( - first_ref=LumaReference(image=style_image, weight=weight) - ) - return await cls._convert_luma_refs(chain, max_refs=1, auth_kwargs=auth_kwargs) + async def _convert_style_image(cls, style_image: torch.Tensor, weight: float): + chain = LumaReferenceChain(first_ref=LumaReference(image=style_image, weight=weight)) + return await cls._convert_luma_refs(chain, max_refs=1) class LumaImageModifyNode(IO.ComfyNode): - """ - Modifies images synchronously based on prompt and aspect ratio. - """ - @classmethod def define_schema(cls) -> IO.Schema: return IO.Schema( node_id="LumaImageModifyNode", display_name="Luma Image to Image", category="api node/image/Luma", - description=cleandoc(cls.__doc__ or ""), + description="Modifies images synchronously based on prompt and aspect ratio.", inputs=[ IO.Image.Input( "image", @@ -395,68 +314,37 @@ async def execute( image_weight: float, seed, ) -> IO.NodeOutput: - auth_kwargs = { - "auth_token": cls.hidden.auth_token_comfy_org, - "comfy_api_key": cls.hidden.api_key_comfy_org, - } - # first, upload image - download_urls = await upload_images_to_comfyapi( - image, max_images=1, auth_kwargs=auth_kwargs, - ) + download_urls = await upload_images_to_comfyapi(cls, image, max_images=1) image_url = download_urls[0] - # next, make Luma call with download url provided - operation = SynchronousOperation( - endpoint=ApiEndpoint( - path="/proxy/luma/generations/image", - method=HttpMethod.POST, - request_model=LumaImageGenerationRequest, - response_model=LumaGeneration, - ), - request=LumaImageGenerationRequest( + response_api = await sync_op( + cls, + ApiEndpoint(path="/proxy/luma/generations/image", method="POST"), + response_model=LumaGeneration, + data=LumaImageGenerationRequest( prompt=prompt, model=model, modify_image_ref=LumaModifyImageRef( - url=image_url, weight=round(max(min(1.0-image_weight, 0.98), 0.0), 2) + url=image_url, weight=round(max(min(1.0 - image_weight, 0.98), 0.0), 2) ), ), - auth_kwargs=auth_kwargs, ) - response_api: LumaGeneration = await operation.execute() - - operation = PollingOperation( - poll_endpoint=ApiEndpoint( - path=f"/proxy/luma/generations/{response_api.id}", - method=HttpMethod.GET, - request_model=EmptyRequest, - response_model=LumaGeneration, - ), - completed_statuses=[LumaState.completed], - failed_statuses=[LumaState.failed], + response_poll = await poll_op( + cls, + ApiEndpoint(path=f"/proxy/luma/generations/{response_api.id}"), + response_model=LumaGeneration, status_extractor=lambda x: x.state, - result_url_extractor=image_result_url_extractor, - node_id=cls.hidden.unique_id, - auth_kwargs=auth_kwargs, ) - response_poll = await operation.execute() - - async with aiohttp.ClientSession() as session: - async with session.get(response_poll.assets.image) as img_response: - img = process_image_response(await img_response.content.read()) - return IO.NodeOutput(img) + return IO.NodeOutput(await download_url_to_image_tensor(response_poll.assets.image)) class LumaTextToVideoGenerationNode(IO.ComfyNode): - """ - Generates videos synchronously based on prompt and output_size. - """ - @classmethod def define_schema(cls) -> IO.Schema: return IO.Schema( node_id="LumaVideoNode", display_name="Luma Text to Video", category="api node/video/Luma", - description=cleandoc(cls.__doc__ or ""), + description="Generates videos synchronously based on prompt and output_size.", inputs=[ IO.String.Input( "prompt", @@ -498,7 +386,7 @@ def define_schema(cls) -> IO.Schema: "luma_concepts", tooltip="Optional Camera Concepts to dictate camera motion via the Luma Concepts node.", optional=True, - ) + ), ], outputs=[IO.Video.Output()], hidden=[ @@ -519,24 +407,17 @@ async def execute( duration: str, loop: bool, seed, - luma_concepts: LumaConceptChain = None, + luma_concepts: Optional[LumaConceptChain] = None, ) -> IO.NodeOutput: validate_string(prompt, strip_whitespace=False, min_length=3) duration = duration if model != LumaVideoModel.ray_1_6 else None resolution = resolution if model != LumaVideoModel.ray_1_6 else None - auth_kwargs = { - "auth_token": cls.hidden.auth_token_comfy_org, - "comfy_api_key": cls.hidden.api_key_comfy_org, - } - operation = SynchronousOperation( - endpoint=ApiEndpoint( - path="/proxy/luma/generations", - method=HttpMethod.POST, - request_model=LumaGenerationRequest, - response_model=LumaGeneration, - ), - request=LumaGenerationRequest( + response_api = await sync_op( + cls, + ApiEndpoint(path="/proxy/luma/generations", method="POST"), + response_model=LumaGeneration, + data=LumaGenerationRequest( prompt=prompt, model=model, resolution=resolution, @@ -545,47 +426,25 @@ async def execute( loop=loop, concepts=luma_concepts.create_api_model() if luma_concepts else None, ), - auth_kwargs=auth_kwargs, ) - response_api: LumaGeneration = await operation.execute() - - if cls.hidden.unique_id: - PromptServer.instance.send_progress_text(f"Luma video generation started: {response_api.id}", cls.hidden.unique_id) - - operation = PollingOperation( - poll_endpoint=ApiEndpoint( - path=f"/proxy/luma/generations/{response_api.id}", - method=HttpMethod.GET, - request_model=EmptyRequest, - response_model=LumaGeneration, - ), - completed_statuses=[LumaState.completed], - failed_statuses=[LumaState.failed], + response_poll = await poll_op( + cls, + ApiEndpoint(path=f"/proxy/luma/generations/{response_api.id}"), + response_model=LumaGeneration, status_extractor=lambda x: x.state, - result_url_extractor=video_result_url_extractor, - node_id=cls.hidden.unique_id, estimated_duration=LUMA_T2V_AVERAGE_DURATION, - auth_kwargs=auth_kwargs, ) - response_poll = await operation.execute() - - async with aiohttp.ClientSession() as session: - async with session.get(response_poll.assets.video) as vid_response: - return IO.NodeOutput(VideoFromFile(BytesIO(await vid_response.content.read()))) + return IO.NodeOutput(await download_url_to_video_output(response_poll.assets.video)) class LumaImageToVideoGenerationNode(IO.ComfyNode): - """ - Generates videos synchronously based on prompt, input images, and output_size. - """ - @classmethod def define_schema(cls) -> IO.Schema: return IO.Schema( node_id="LumaImageToVideoNode", display_name="Luma Image to Video", category="api node/video/Luma", - description=cleandoc(cls.__doc__ or ""), + description="Generates videos synchronously based on prompt, input images, and output_size.", inputs=[ IO.String.Input( "prompt", @@ -637,7 +496,7 @@ def define_schema(cls) -> IO.Schema: "luma_concepts", tooltip="Optional Camera Concepts to dictate camera motion via the Luma Concepts node.", optional=True, - ) + ), ], outputs=[IO.Video.Output()], hidden=[ @@ -662,25 +521,15 @@ async def execute( luma_concepts: LumaConceptChain = None, ) -> IO.NodeOutput: if first_image is None and last_image is None: - raise Exception( - "At least one of first_image and last_image requires an input." - ) - auth_kwargs = { - "auth_token": cls.hidden.auth_token_comfy_org, - "comfy_api_key": cls.hidden.api_key_comfy_org, - } - keyframes = await cls._convert_to_keyframes(first_image, last_image, auth_kwargs=auth_kwargs) + raise Exception("At least one of first_image and last_image requires an input.") + keyframes = await cls._convert_to_keyframes(first_image, last_image) duration = duration if model != LumaVideoModel.ray_1_6 else None resolution = resolution if model != LumaVideoModel.ray_1_6 else None - - operation = SynchronousOperation( - endpoint=ApiEndpoint( - path="/proxy/luma/generations", - method=HttpMethod.POST, - request_model=LumaGenerationRequest, - response_model=LumaGeneration, - ), - request=LumaGenerationRequest( + response_api = await sync_op( + cls, + ApiEndpoint(path="/proxy/luma/generations", method="POST"), + response_model=LumaGeneration, + data=LumaGenerationRequest( prompt=prompt, model=model, aspect_ratio=LumaAspectRatio.ratio_16_9, # ignored, but still needed by the API for some reason @@ -690,54 +539,31 @@ async def execute( keyframes=keyframes, concepts=luma_concepts.create_api_model() if luma_concepts else None, ), - auth_kwargs=auth_kwargs, ) - response_api: LumaGeneration = await operation.execute() - - if cls.hidden.unique_id: - PromptServer.instance.send_progress_text(f"Luma video generation started: {response_api.id}", cls.hidden.unique_id) - - operation = PollingOperation( - poll_endpoint=ApiEndpoint( - path=f"/proxy/luma/generations/{response_api.id}", - method=HttpMethod.GET, - request_model=EmptyRequest, - response_model=LumaGeneration, - ), - completed_statuses=[LumaState.completed], - failed_statuses=[LumaState.failed], + response_poll = await poll_op( + cls, + poll_endpoint=ApiEndpoint(path=f"/proxy/luma/generations/{response_api.id}"), + response_model=LumaGeneration, status_extractor=lambda x: x.state, - result_url_extractor=video_result_url_extractor, - node_id=cls.hidden.unique_id, estimated_duration=LUMA_I2V_AVERAGE_DURATION, - auth_kwargs=auth_kwargs, ) - response_poll = await operation.execute() - - async with aiohttp.ClientSession() as session: - async with session.get(response_poll.assets.video) as vid_response: - return IO.NodeOutput(VideoFromFile(BytesIO(await vid_response.content.read()))) + return IO.NodeOutput(await download_url_to_video_output(response_poll.assets.video)) @classmethod async def _convert_to_keyframes( cls, first_image: torch.Tensor = None, last_image: torch.Tensor = None, - auth_kwargs: Optional[dict[str,str]] = None, ): if first_image is None and last_image is None: return None frame0 = None frame1 = None if first_image is not None: - download_urls = await upload_images_to_comfyapi( - first_image, max_images=1, auth_kwargs=auth_kwargs, - ) + download_urls = await upload_images_to_comfyapi(cls, first_image, max_images=1) frame0 = LumaImageReference(type="image", url=download_urls[0]) if last_image is not None: - download_urls = await upload_images_to_comfyapi( - last_image, max_images=1, auth_kwargs=auth_kwargs, - ) + download_urls = await upload_images_to_comfyapi(cls, last_image, max_images=1) frame1 = LumaImageReference(type="image", url=download_urls[0]) return LumaKeyframes(frame0=frame0, frame1=frame1) diff --git a/comfy_api_nodes/nodes_minimax.py b/comfy_api_nodes/nodes_minimax.py index e3722e79b715..05cbb700f6e0 100644 --- a/comfy_api_nodes/nodes_minimax.py +++ b/comfy_api_nodes/nodes_minimax.py @@ -1,71 +1,57 @@ -from inspect import cleandoc from typing import Optional -import logging -import torch +import torch from typing_extensions import override -from comfy_api.latest import ComfyExtension, IO -from comfy_api.input_impl.video_types import VideoFromFile -from comfy_api_nodes.apis import ( - MinimaxVideoGenerationRequest, - MinimaxVideoGenerationResponse, + +from comfy_api.latest import IO, ComfyExtension +from comfy_api_nodes.apis.minimax_api import ( MinimaxFileRetrieveResponse, + MiniMaxModel, MinimaxTaskResultResponse, + MinimaxVideoGenerationRequest, + MinimaxVideoGenerationResponse, SubjectReferenceItem, - MiniMaxModel, ) -from comfy_api_nodes.apis.client import ( +from comfy_api_nodes.util import ( ApiEndpoint, - HttpMethod, - SynchronousOperation, - PollingOperation, - EmptyRequest, -) -from comfy_api_nodes.apinode_utils import ( - download_url_to_bytesio, + download_url_to_video_output, + poll_op, + sync_op, upload_images_to_comfyapi, + validate_string, ) -from comfy_api_nodes.util import validate_string -from server import PromptServer - I2V_AVERAGE_DURATION = 114 T2V_AVERAGE_DURATION = 234 async def _generate_mm_video( + cls: type[IO.ComfyNode], *, - auth: dict[str, str], - node_id: str, prompt_text: str, seed: int, model: str, - image: Optional[torch.Tensor] = None, # used for ImageToVideo - subject: Optional[torch.Tensor] = None, # used for SubjectToVideo + image: Optional[torch.Tensor] = None, # used for ImageToVideo + subject: Optional[torch.Tensor] = None, # used for SubjectToVideo average_duration: Optional[int] = None, ) -> IO.NodeOutput: if image is None: validate_string(prompt_text, field_name="prompt_text") - # upload image, if passed in image_url = None if image is not None: - image_url = (await upload_images_to_comfyapi(image, max_images=1, auth_kwargs=auth))[0] + image_url = (await upload_images_to_comfyapi(cls, image, max_images=1))[0] # TODO: figure out how to deal with subject properly, API returns invalid params when using S2V-01 model subject_reference = None if subject is not None: - subject_url = (await upload_images_to_comfyapi(subject, max_images=1, auth_kwargs=auth))[0] + subject_url = (await upload_images_to_comfyapi(cls, subject, max_images=1))[0] subject_reference = [SubjectReferenceItem(image=subject_url)] - - video_generate_operation = SynchronousOperation( - endpoint=ApiEndpoint( - path="/proxy/minimax/video_generation", - method=HttpMethod.POST, - request_model=MinimaxVideoGenerationRequest, - response_model=MinimaxVideoGenerationResponse, - ), - request=MinimaxVideoGenerationRequest( + response = await sync_op( + cls, + ApiEndpoint(path="/proxy/minimax/video_generation", method="POST"), + response_model=MinimaxVideoGenerationResponse, + data=MinimaxVideoGenerationRequest( model=MiniMaxModel(model), prompt=prompt_text, callback_url=None, @@ -73,81 +59,50 @@ async def _generate_mm_video( subject_reference=subject_reference, prompt_optimizer=None, ), - auth_kwargs=auth, ) - response = await video_generate_operation.execute() task_id = response.task_id if not task_id: raise Exception(f"MiniMax generation failed: {response.base_resp}") - video_generate_operation = PollingOperation( - poll_endpoint=ApiEndpoint( - path="/proxy/minimax/query/video_generation", - method=HttpMethod.GET, - request_model=EmptyRequest, - response_model=MinimaxTaskResultResponse, - query_params={"task_id": task_id}, - ), - completed_statuses=["Success"], - failed_statuses=["Fail"], + task_result = await poll_op( + cls, + ApiEndpoint(path="/proxy/minimax/query/video_generation", query_params={"task_id": task_id}), + response_model=MinimaxTaskResultResponse, status_extractor=lambda x: x.status.value, estimated_duration=average_duration, - node_id=node_id, - auth_kwargs=auth, ) - task_result = await video_generate_operation.execute() file_id = task_result.file_id if file_id is None: raise Exception("Request was not successful. Missing file ID.") - file_retrieve_operation = SynchronousOperation( - endpoint=ApiEndpoint( - path="/proxy/minimax/files/retrieve", - method=HttpMethod.GET, - request_model=EmptyRequest, - response_model=MinimaxFileRetrieveResponse, - query_params={"file_id": int(file_id)}, - ), - request=EmptyRequest(), - auth_kwargs=auth, + file_result = await sync_op( + cls, + ApiEndpoint(path="/proxy/minimax/files/retrieve", query_params={"file_id": int(file_id)}), + response_model=MinimaxFileRetrieveResponse, ) - file_result = await file_retrieve_operation.execute() file_url = file_result.file.download_url if file_url is None: - raise Exception( - f"No video was found in the response. Full response: {file_result.model_dump()}" - ) - logging.info("Generated video URL: %s", file_url) - if node_id: - if hasattr(file_result.file, "backup_download_url"): - message = f"Result URL: {file_url}\nBackup URL: {file_result.file.backup_download_url}" - else: - message = f"Result URL: {file_url}" - PromptServer.instance.send_progress_text(message, node_id) - - # Download and return as VideoFromFile - video_io = await download_url_to_bytesio(file_url) - if video_io is None: - error_msg = f"Failed to download video from {file_url}" - logging.error(error_msg) - raise Exception(error_msg) - return IO.NodeOutput(VideoFromFile(video_io)) + raise Exception(f"No video was found in the response. Full response: {file_result.model_dump()}") + if file_result.file.backup_download_url: + try: + return IO.NodeOutput(await download_url_to_video_output(file_url, timeout=10, max_retries=2)) + except Exception: # if we have a second URL to retrieve the result, try again using that one + return IO.NodeOutput( + await download_url_to_video_output(file_result.file.backup_download_url, max_retries=3) + ) + return IO.NodeOutput(await download_url_to_video_output(file_url)) class MinimaxTextToVideoNode(IO.ComfyNode): - """ - Generates videos synchronously based on a prompt, and optional parameters using MiniMax's API. - """ - @classmethod def define_schema(cls) -> IO.Schema: return IO.Schema( node_id="MinimaxTextToVideoNode", display_name="MiniMax Text to Video", category="api node/video/MiniMax", - description=cleandoc(cls.__doc__ or ""), + description="Generates videos synchronously based on a prompt, and optional parameters.", inputs=[ IO.String.Input( "prompt_text", @@ -189,11 +144,7 @@ async def execute( seed: int = 0, ) -> IO.NodeOutput: return await _generate_mm_video( - auth={ - "auth_token": cls.hidden.auth_token_comfy_org, - "comfy_api_key": cls.hidden.api_key_comfy_org, - }, - node_id=cls.hidden.unique_id, + cls, prompt_text=prompt_text, seed=seed, model=model, @@ -204,17 +155,13 @@ async def execute( class MinimaxImageToVideoNode(IO.ComfyNode): - """ - Generates videos synchronously based on an image and prompt, and optional parameters using MiniMax's API. - """ - @classmethod def define_schema(cls) -> IO.Schema: return IO.Schema( node_id="MinimaxImageToVideoNode", display_name="MiniMax Image to Video", category="api node/video/MiniMax", - description=cleandoc(cls.__doc__ or ""), + description="Generates videos synchronously based on an image and prompt, and optional parameters.", inputs=[ IO.Image.Input( "image", @@ -261,11 +208,7 @@ async def execute( seed: int = 0, ) -> IO.NodeOutput: return await _generate_mm_video( - auth={ - "auth_token": cls.hidden.auth_token_comfy_org, - "comfy_api_key": cls.hidden.api_key_comfy_org, - }, - node_id=cls.hidden.unique_id, + cls, prompt_text=prompt_text, seed=seed, model=model, @@ -276,17 +219,13 @@ async def execute( class MinimaxSubjectToVideoNode(IO.ComfyNode): - """ - Generates videos synchronously based on an image and prompt, and optional parameters using MiniMax's API. - """ - @classmethod def define_schema(cls) -> IO.Schema: return IO.Schema( node_id="MinimaxSubjectToVideoNode", display_name="MiniMax Subject to Video", category="api node/video/MiniMax", - description=cleandoc(cls.__doc__ or ""), + description="Generates videos synchronously based on an image and prompt, and optional parameters.", inputs=[ IO.Image.Input( "subject", @@ -333,11 +272,7 @@ async def execute( seed: int = 0, ) -> IO.NodeOutput: return await _generate_mm_video( - auth={ - "auth_token": cls.hidden.auth_token_comfy_org, - "comfy_api_key": cls.hidden.api_key_comfy_org, - }, - node_id=cls.hidden.unique_id, + cls, prompt_text=prompt_text, seed=seed, model=model, @@ -348,15 +283,13 @@ async def execute( class MinimaxHailuoVideoNode(IO.ComfyNode): - """Generates videos from prompt, with optional start frame using the new MiniMax Hailuo-02 model.""" - @classmethod def define_schema(cls) -> IO.Schema: return IO.Schema( node_id="MinimaxHailuoVideoNode", display_name="MiniMax Hailuo Video", category="api node/video/MiniMax", - description=cleandoc(cls.__doc__ or ""), + description="Generates videos from prompt, with optional start frame using the new MiniMax Hailuo-02 model.", inputs=[ IO.String.Input( "prompt_text", @@ -420,10 +353,6 @@ async def execute( resolution: str = "768P", model: str = "MiniMax-Hailuo-02", ) -> IO.NodeOutput: - auth = { - "auth_token": cls.hidden.auth_token_comfy_org, - "comfy_api_key": cls.hidden.api_key_comfy_org, - } if first_frame_image is None: validate_string(prompt_text, field_name="prompt_text") @@ -435,16 +364,13 @@ async def execute( # upload image, if passed in image_url = None if first_frame_image is not None: - image_url = (await upload_images_to_comfyapi(first_frame_image, max_images=1, auth_kwargs=auth))[0] - - video_generate_operation = SynchronousOperation( - endpoint=ApiEndpoint( - path="/proxy/minimax/video_generation", - method=HttpMethod.POST, - request_model=MinimaxVideoGenerationRequest, - response_model=MinimaxVideoGenerationResponse, - ), - request=MinimaxVideoGenerationRequest( + image_url = (await upload_images_to_comfyapi(cls, first_frame_image, max_images=1))[0] + + response = await sync_op( + cls, + ApiEndpoint(path="/proxy/minimax/video_generation", method="POST"), + response_model=MinimaxVideoGenerationResponse, + data=MinimaxVideoGenerationRequest( model=MiniMaxModel(model), prompt=prompt_text, callback_url=None, @@ -453,67 +379,42 @@ async def execute( duration=duration, resolution=resolution, ), - auth_kwargs=auth, ) - response = await video_generate_operation.execute() task_id = response.task_id if not task_id: raise Exception(f"MiniMax generation failed: {response.base_resp}") average_duration = 120 if resolution == "768P" else 240 - video_generate_operation = PollingOperation( - poll_endpoint=ApiEndpoint( - path="/proxy/minimax/query/video_generation", - method=HttpMethod.GET, - request_model=EmptyRequest, - response_model=MinimaxTaskResultResponse, - query_params={"task_id": task_id}, - ), - completed_statuses=["Success"], - failed_statuses=["Fail"], + task_result = await poll_op( + cls, + ApiEndpoint(path="/proxy/minimax/query/video_generation", query_params={"task_id": task_id}), + response_model=MinimaxTaskResultResponse, status_extractor=lambda x: x.status.value, estimated_duration=average_duration, - node_id=cls.hidden.unique_id, - auth_kwargs=auth, ) - task_result = await video_generate_operation.execute() file_id = task_result.file_id if file_id is None: raise Exception("Request was not successful. Missing file ID.") - file_retrieve_operation = SynchronousOperation( - endpoint=ApiEndpoint( - path="/proxy/minimax/files/retrieve", - method=HttpMethod.GET, - request_model=EmptyRequest, - response_model=MinimaxFileRetrieveResponse, - query_params={"file_id": int(file_id)}, - ), - request=EmptyRequest(), - auth_kwargs=auth, + file_result = await sync_op( + cls, + ApiEndpoint(path="/proxy/minimax/files/retrieve", query_params={"file_id": int(file_id)}), + response_model=MinimaxFileRetrieveResponse, ) - file_result = await file_retrieve_operation.execute() file_url = file_result.file.download_url if file_url is None: - raise Exception( - f"No video was found in the response. Full response: {file_result.model_dump()}" - ) - logging.info("Generated video URL: %s", file_url) - if cls.hidden.unique_id: - if hasattr(file_result.file, "backup_download_url"): - message = f"Result URL: {file_url}\nBackup URL: {file_result.file.backup_download_url}" - else: - message = f"Result URL: {file_url}" - PromptServer.instance.send_progress_text(message, cls.hidden.unique_id) - - video_io = await download_url_to_bytesio(file_url) - if video_io is None: - error_msg = f"Failed to download video from {file_url}" - logging.error(error_msg) - raise Exception(error_msg) - return IO.NodeOutput(VideoFromFile(video_io)) + raise Exception(f"No video was found in the response. Full response: {file_result.model_dump()}") + + if file_result.file.backup_download_url: + try: + return IO.NodeOutput(await download_url_to_video_output(file_url, timeout=10, max_retries=2)) + except Exception: # if we have a second URL to retrieve the result, try again using that one + return IO.NodeOutput( + await download_url_to_video_output(file_result.file.backup_download_url, max_retries=3) + ) + return IO.NodeOutput(await download_url_to_video_output(file_url)) class MinimaxExtension(ComfyExtension): diff --git a/comfy_api_nodes/util/client.py b/comfy_api_nodes/util/client.py index 9c036d64b55d..9ae512fe5418 100644 --- a/comfy_api_nodes/util/client.py +++ b/comfy_api_nodes/util/client.py @@ -78,7 +78,7 @@ class _PollUIState: _RETRY_STATUS = {408, 429, 500, 502, 503, 504} COMPLETED_STATUSES = ["succeeded", "succeed", "success", "completed"] -FAILED_STATUSES = ["cancelled", "canceled", "failed", "error"] +FAILED_STATUSES = ["cancelled", "canceled", "fail", "failed", "error"] QUEUED_STATUSES = ["created", "queued", "queueing", "submitted"] diff --git a/comfy_api_nodes/util/download_helpers.py b/comfy_api_nodes/util/download_helpers.py index f89045e12ad4..364874bedb9d 100644 --- a/comfy_api_nodes/util/download_helpers.py +++ b/comfy_api_nodes/util/download_helpers.py @@ -232,11 +232,12 @@ async def download_url_to_video_output( video_url: str, *, timeout: float = None, + max_retries: int = 5, cls: type[COMFY_IO.ComfyNode] = None, ) -> VideoFromFile: """Downloads a video from a URL and returns a `VIDEO` output.""" result = BytesIO() - await download_url_to_bytesio(video_url, result, timeout=timeout, cls=cls) + await download_url_to_bytesio(video_url, result, timeout=timeout, max_retries=max_retries, cls=cls) return VideoFromFile(result) From 1a58087ac2eb64be3645934d0025aafaa5bdce38 Mon Sep 17 00:00:00 2001 From: comfyanonymous <121283862+comfyanonymous@users.noreply.github.com> Date: Wed, 29 Oct 2025 12:43:51 -0700 Subject: [PATCH 2/5] Reduce memory usage for fp8 scaled op. (#10531) --- comfy/quant_ops.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/comfy/quant_ops.py b/comfy/quant_ops.py index b14e03084e8f..fb35a0d40b43 100644 --- a/comfy/quant_ops.py +++ b/comfy/quant_ops.py @@ -358,7 +358,7 @@ def quantize(cls, tensor, scale=None, dtype=torch.float8_e4m3fn): scale = scale.to(device=tensor.device, dtype=torch.float32) lp_amax = torch.finfo(dtype).max - tensor_scaled = tensor.float() / scale + tensor_scaled = tensor * (1.0 / scale).to(tensor.dtype) torch.clamp(tensor_scaled, min=-lp_amax, max=lp_amax, out=tensor_scaled) qdata = tensor_scaled.to(dtype, memory_format=torch.contiguous_format) From ec4fc2a09a390d0d81500c51fb9e4d8a7a5ce1fc Mon Sep 17 00:00:00 2001 From: comfyanonymous <121283862+comfyanonymous@users.noreply.github.com> Date: Wed, 29 Oct 2025 12:48:06 -0700 Subject: [PATCH 3/5] Fix case of weights not being unpinned. (#10533) --- comfy/model_patcher.py | 1 + 1 file changed, 1 insertion(+) diff --git a/comfy/model_patcher.py b/comfy/model_patcher.py index aec73349c8e5..119119e96aec 100644 --- a/comfy/model_patcher.py +++ b/comfy/model_patcher.py @@ -1283,5 +1283,6 @@ def clean_hooks(self): self.clear_cached_hook_weights() def __del__(self): + self.unpin_all_weights() self.detach(unpatch_all=False) From ab7ab5be23fb9b71d1790f424e7dcf91dc1fe0cc Mon Sep 17 00:00:00 2001 From: rattus <46076784+rattus128@users.noreply.github.com> Date: Thu, 30 Oct 2025 07:17:46 +1000 Subject: [PATCH 4/5] Fix Race condition in --async-offload that can cause corruption (#10501) * mm: factor out the current stream getter Make this a reusable function. * ops: sync the offload stream with the consumption of w&b This sync is nessacary as pytorch will queue cuda async frees on the same stream as created to tensor. In the case of async offload, this will be on the offload stream. Weights and biases can go out of scope in python which then triggers the pytorch garbage collector to queue the free operation on the offload stream possible before the compute stream has used the weight. This causes a use after free on weight data leading to total corruption of some workflows. So sync the offload stream with the compute stream after the weight has been used so the free has to wait for the weight to be used. The cast_bias_weight is extended in a backwards compatible way with the new behaviour opt-in on a defaulted parameter. This handles custom node packs calling cast_bias_weight and defeatures async-offload for them (as they do not handle the race). The pattern is now: cast_bias_weight(... , offloadable=True) #This might be offloaded thing(weight, bias, ...) uncast_bias_weight(...) * controlnet: adopt new cast_bias_weight synchronization scheme This is nessacary for safe async weight offloading. * mm: sync the last stream in the queue, not the next Currently this peeks ahead to sync the next stream in the queue of streams with the compute stream. This doesnt allow a lot of parallelization, as then end result is you can only get one weight load ahead regardless of how many streams you have. Rotate the loop logic here to synchronize the end of the queue before returning the next stream. This allows weights to be loaded ahead of the compute streams position. --- comfy/controlnet.py | 17 +++--- comfy/model_management.py | 28 +++++---- comfy/ops.py | 121 ++++++++++++++++++++++++++++---------- 3 files changed, 114 insertions(+), 52 deletions(-) diff --git a/comfy/controlnet.py b/comfy/controlnet.py index f08ff4b36bd8..0b5e30f52a62 100644 --- a/comfy/controlnet.py +++ b/comfy/controlnet.py @@ -310,11 +310,13 @@ def __init__(self, in_features: int, out_features: int, bias: bool = True, self.bias = None def forward(self, input): - weight, bias = comfy.ops.cast_bias_weight(self, input) + weight, bias, offload_stream = comfy.ops.cast_bias_weight(self, input, offloadable=True) if self.up is not None: - return torch.nn.functional.linear(input, weight + (torch.mm(self.up.flatten(start_dim=1), self.down.flatten(start_dim=1))).reshape(self.weight.shape).type(input.dtype), bias) + x = torch.nn.functional.linear(input, weight + (torch.mm(self.up.flatten(start_dim=1), self.down.flatten(start_dim=1))).reshape(self.weight.shape).type(input.dtype), bias) else: - return torch.nn.functional.linear(input, weight, bias) + x = torch.nn.functional.linear(input, weight, bias) + comfy.ops.uncast_bias_weight(self, weight, bias, offload_stream) + return x class Conv2d(torch.nn.Module, comfy.ops.CastWeightBiasOp): def __init__( @@ -350,12 +352,13 @@ def __init__( def forward(self, input): - weight, bias = comfy.ops.cast_bias_weight(self, input) + weight, bias, offload_stream = comfy.ops.cast_bias_weight(self, input, offloadable=True) if self.up is not None: - return torch.nn.functional.conv2d(input, weight + (torch.mm(self.up.flatten(start_dim=1), self.down.flatten(start_dim=1))).reshape(self.weight.shape).type(input.dtype), bias, self.stride, self.padding, self.dilation, self.groups) + x = torch.nn.functional.conv2d(input, weight + (torch.mm(self.up.flatten(start_dim=1), self.down.flatten(start_dim=1))).reshape(self.weight.shape).type(input.dtype), bias, self.stride, self.padding, self.dilation, self.groups) else: - return torch.nn.functional.conv2d(input, weight, bias, self.stride, self.padding, self.dilation, self.groups) - + x = torch.nn.functional.conv2d(input, weight, bias, self.stride, self.padding, self.dilation, self.groups) + comfy.ops.uncast_bias_weight(self, weight, bias, offload_stream) + return x class ControlLora(ControlNet): def __init__(self, control_weights, global_average_pooling=False, model_options={}): #TODO? model_options diff --git a/comfy/model_management.py b/comfy/model_management.py index 3e5b977d4375..79c0dfdb4b7e 100644 --- a/comfy/model_management.py +++ b/comfy/model_management.py @@ -1013,6 +1013,16 @@ def force_channels_last(): NUM_STREAMS = 2 logging.info("Using async weight offloading with {} streams".format(NUM_STREAMS)) +def current_stream(device): + if device is None: + return None + if is_device_cuda(device): + return torch.cuda.current_stream() + elif is_device_xpu(device): + return torch.xpu.current_stream() + else: + return None + stream_counters = {} def get_offload_stream(device): stream_counter = stream_counters.get(device, 0) @@ -1021,21 +1031,17 @@ def get_offload_stream(device): if device in STREAMS: ss = STREAMS[device] - s = ss[stream_counter] + #Sync the oldest stream in the queue with the current + ss[stream_counter].wait_stream(current_stream(device)) stream_counter = (stream_counter + 1) % len(ss) - if is_device_cuda(device): - ss[stream_counter].wait_stream(torch.cuda.current_stream()) - elif is_device_xpu(device): - ss[stream_counter].wait_stream(torch.xpu.current_stream()) stream_counters[device] = stream_counter - return s + return ss[stream_counter] elif is_device_cuda(device): ss = [] for k in range(NUM_STREAMS): ss.append(torch.cuda.Stream(device=device, priority=0)) STREAMS[device] = ss s = ss[stream_counter] - stream_counter = (stream_counter + 1) % len(ss) stream_counters[device] = stream_counter return s elif is_device_xpu(device): @@ -1044,18 +1050,14 @@ def get_offload_stream(device): ss.append(torch.xpu.Stream(device=device, priority=0)) STREAMS[device] = ss s = ss[stream_counter] - stream_counter = (stream_counter + 1) % len(ss) stream_counters[device] = stream_counter return s return None def sync_stream(device, stream): - if stream is None: + if stream is None or current_stream(device) is None: return - if is_device_cuda(device): - torch.cuda.current_stream().wait_stream(stream) - elif is_device_xpu(device): - torch.xpu.current_stream().wait_stream(stream) + current_stream(device).wait_stream(stream) def cast_to(weight, dtype=None, device=None, non_blocking=False, copy=False, stream=None): if device is None or weight.device == device: diff --git a/comfy/ops.py b/comfy/ops.py index 93731eedf276..71ca7a2bd11e 100644 --- a/comfy/ops.py +++ b/comfy/ops.py @@ -70,8 +70,12 @@ def scaled_dot_product_attention(q, k, v, *args, **kwargs): def cast_to_input(weight, input, non_blocking=False, copy=True): return comfy.model_management.cast_to(weight, input.dtype, input.device, non_blocking=non_blocking, copy=copy) + @torch.compiler.disable() -def cast_bias_weight(s, input=None, dtype=None, device=None, bias_dtype=None): +def cast_bias_weight(s, input=None, dtype=None, device=None, bias_dtype=None, offloadable=False): + # NOTE: offloadable=False is a a legacy and if you are a custom node author reading this please pass + # offloadable=True and call uncast_bias_weight() after your last usage of the weight/bias. This + # will add async-offload support to your cast and improve performance. if input is not None: if dtype is None: dtype = input.dtype @@ -80,7 +84,11 @@ def cast_bias_weight(s, input=None, dtype=None, device=None, bias_dtype=None): if device is None: device = input.device - offload_stream = comfy.model_management.get_offload_stream(device) + if offloadable: + offload_stream = comfy.model_management.get_offload_stream(device) + else: + offload_stream = None + if offload_stream is not None: wf_context = offload_stream else: @@ -105,7 +113,24 @@ def cast_bias_weight(s, input=None, dtype=None, device=None, bias_dtype=None): weight = f(weight) comfy.model_management.sync_stream(device, offload_stream) - return weight, bias + if offloadable: + return weight, bias, offload_stream + else: + #Legacy function signature + return weight, bias + + +def uncast_bias_weight(s, weight, bias, offload_stream): + if offload_stream is None: + return + if weight is not None: + device = weight.device + else: + if bias is None: + return + device = bias.device + offload_stream.wait_stream(comfy.model_management.current_stream(device)) + class CastWeightBiasOp: comfy_cast_weights = False @@ -118,8 +143,10 @@ def reset_parameters(self): return None def forward_comfy_cast_weights(self, input): - weight, bias = cast_bias_weight(self, input) - return torch.nn.functional.linear(input, weight, bias) + weight, bias, offload_stream = cast_bias_weight(self, input, offloadable=True) + x = torch.nn.functional.linear(input, weight, bias) + uncast_bias_weight(self, weight, bias, offload_stream) + return x def forward(self, *args, **kwargs): run_every_op() @@ -133,8 +160,10 @@ def reset_parameters(self): return None def forward_comfy_cast_weights(self, input): - weight, bias = cast_bias_weight(self, input) - return self._conv_forward(input, weight, bias) + weight, bias, offload_stream = cast_bias_weight(self, input, offloadable=True) + x = self._conv_forward(input, weight, bias) + uncast_bias_weight(self, weight, bias, offload_stream) + return x def forward(self, *args, **kwargs): run_every_op() @@ -148,8 +177,10 @@ def reset_parameters(self): return None def forward_comfy_cast_weights(self, input): - weight, bias = cast_bias_weight(self, input) - return self._conv_forward(input, weight, bias) + weight, bias, offload_stream = cast_bias_weight(self, input, offloadable=True) + x = self._conv_forward(input, weight, bias) + uncast_bias_weight(self, weight, bias, offload_stream) + return x def forward(self, *args, **kwargs): run_every_op() @@ -172,8 +203,10 @@ def _conv_forward(self, input, weight, bias, *args, **kwargs): return super()._conv_forward(input, weight, bias, *args, **kwargs) def forward_comfy_cast_weights(self, input): - weight, bias = cast_bias_weight(self, input) - return self._conv_forward(input, weight, bias) + weight, bias, offload_stream = cast_bias_weight(self, input, offloadable=True) + x = self._conv_forward(input, weight, bias) + uncast_bias_weight(self, weight, bias, offload_stream) + return x def forward(self, *args, **kwargs): run_every_op() @@ -187,8 +220,10 @@ def reset_parameters(self): return None def forward_comfy_cast_weights(self, input): - weight, bias = cast_bias_weight(self, input) - return torch.nn.functional.group_norm(input, self.num_groups, weight, bias, self.eps) + weight, bias, offload_stream = cast_bias_weight(self, input, offloadable=True) + x = torch.nn.functional.group_norm(input, self.num_groups, weight, bias, self.eps) + uncast_bias_weight(self, weight, bias, offload_stream) + return x def forward(self, *args, **kwargs): run_every_op() @@ -203,11 +238,14 @@ def reset_parameters(self): def forward_comfy_cast_weights(self, input): if self.weight is not None: - weight, bias = cast_bias_weight(self, input) + weight, bias, offload_stream = cast_bias_weight(self, input, offloadable=True) else: weight = None bias = None - return torch.nn.functional.layer_norm(input, self.normalized_shape, weight, bias, self.eps) + offload_stream = None + x = torch.nn.functional.layer_norm(input, self.normalized_shape, weight, bias, self.eps) + uncast_bias_weight(self, weight, bias, offload_stream) + return x def forward(self, *args, **kwargs): run_every_op() @@ -223,11 +261,15 @@ def reset_parameters(self): def forward_comfy_cast_weights(self, input): if self.weight is not None: - weight, bias = cast_bias_weight(self, input) + weight, bias, offload_stream = cast_bias_weight(self, input, offloadable=True) else: weight = None - return comfy.rmsnorm.rms_norm(input, weight, self.eps) # TODO: switch to commented out line when old torch is deprecated - # return torch.nn.functional.rms_norm(input, self.normalized_shape, weight, self.eps) + bias = None + offload_stream = None + x = comfy.rmsnorm.rms_norm(input, weight, self.eps) # TODO: switch to commented out line when old torch is deprecated + # x = torch.nn.functional.rms_norm(input, self.normalized_shape, weight, self.eps) + uncast_bias_weight(self, weight, bias, offload_stream) + return x def forward(self, *args, **kwargs): run_every_op() @@ -246,10 +288,12 @@ def forward_comfy_cast_weights(self, input, output_size=None): input, output_size, self.stride, self.padding, self.kernel_size, num_spatial_dims, self.dilation) - weight, bias = cast_bias_weight(self, input) - return torch.nn.functional.conv_transpose2d( + weight, bias, offload_stream = cast_bias_weight(self, input, offloadable=True) + x = torch.nn.functional.conv_transpose2d( input, weight, bias, self.stride, self.padding, output_padding, self.groups, self.dilation) + uncast_bias_weight(self, weight, bias, offload_stream) + return x def forward(self, *args, **kwargs): run_every_op() @@ -268,10 +312,12 @@ def forward_comfy_cast_weights(self, input, output_size=None): input, output_size, self.stride, self.padding, self.kernel_size, num_spatial_dims, self.dilation) - weight, bias = cast_bias_weight(self, input) - return torch.nn.functional.conv_transpose1d( + weight, bias, offload_stream = cast_bias_weight(self, input, offloadable=True) + x = torch.nn.functional.conv_transpose1d( input, weight, bias, self.stride, self.padding, output_padding, self.groups, self.dilation) + uncast_bias_weight(self, weight, bias, offload_stream) + return x def forward(self, *args, **kwargs): run_every_op() @@ -289,8 +335,11 @@ def forward_comfy_cast_weights(self, input, out_dtype=None): output_dtype = out_dtype if self.weight.dtype == torch.float16 or self.weight.dtype == torch.bfloat16: out_dtype = None - weight, bias = cast_bias_weight(self, device=input.device, dtype=out_dtype) - return torch.nn.functional.embedding(input, weight, self.padding_idx, self.max_norm, self.norm_type, self.scale_grad_by_freq, self.sparse).to(dtype=output_dtype) + weight, bias, offload_stream = cast_bias_weight(self, device=input.device, dtype=out_dtype, offloadable=True) + x = torch.nn.functional.embedding(input, weight, self.padding_idx, self.max_norm, self.norm_type, self.scale_grad_by_freq, self.sparse).to(dtype=output_dtype) + uncast_bias_weight(self, weight, bias, offload_stream) + return x + def forward(self, *args, **kwargs): run_every_op() @@ -361,7 +410,7 @@ def fp8_linear(self, input): input_dtype = input.dtype if len(input.shape) == 3: - w, bias = cast_bias_weight(self, input, dtype=dtype, bias_dtype=input_dtype) + w, bias, offload_stream = cast_bias_weight(self, input, dtype=dtype, bias_dtype=input_dtype, offloadable=True) scale_weight = self.scale_weight scale_input = self.scale_input @@ -382,6 +431,8 @@ def fp8_linear(self, input): quantized_input = QuantizedTensor.from_float(input.reshape(-1, input_shape[2]), TensorCoreFP8Layout, scale=scale_input, dtype=dtype) o = torch.nn.functional.linear(quantized_input, quantized_weight, bias) + uncast_bias_weight(self, w, bias, offload_stream) + if tensor_2d: return o.reshape(input_shape[0], -1) return o.reshape((-1, input_shape[1], self.weight.shape[0])) @@ -404,8 +455,10 @@ def forward_comfy_cast_weights(self, input): except Exception as e: logging.info("Exception during fp8 op: {}".format(e)) - weight, bias = cast_bias_weight(self, input) - return torch.nn.functional.linear(input, weight, bias) + weight, bias, offload_stream = cast_bias_weight(self, input, offloadable=True) + x = torch.nn.functional.linear(input, weight, bias) + uncast_bias_weight(self, weight, bias, offload_stream) + return x def scaled_fp8_ops(fp8_matrix_mult=False, scale_input=False, override_dtype=None): logging.info("Using scaled fp8: fp8 matrix mult: {}, scale input: {}".format(fp8_matrix_mult, scale_input)) @@ -433,12 +486,14 @@ def forward_comfy_cast_weights(self, input): if out is not None: return out - weight, bias = cast_bias_weight(self, input) + weight, bias, offload_stream = cast_bias_weight(self, input, offloadable=True) if weight.numel() < input.numel(): #TODO: optimize - return torch.nn.functional.linear(input, weight * self.scale_weight.to(device=weight.device, dtype=weight.dtype), bias) + x = torch.nn.functional.linear(input, weight * self.scale_weight.to(device=weight.device, dtype=weight.dtype), bias) else: - return torch.nn.functional.linear(input * self.scale_weight.to(device=weight.device, dtype=weight.dtype), weight, bias) + x = torch.nn.functional.linear(input * self.scale_weight.to(device=weight.device, dtype=weight.dtype), weight, bias) + uncast_bias_weight(self, weight, bias, offload_stream) + return x def convert_weight(self, weight, inplace=False, **kwargs): if inplace: @@ -577,8 +632,10 @@ def _forward(self, input, weight, bias): return torch.nn.functional.linear(input, weight, bias) def forward_comfy_cast_weights(self, input): - weight, bias = cast_bias_weight(self, input) - return self._forward(input, weight, bias) + weight, bias, offload_stream = cast_bias_weight(self, input, offloadable=True) + x = self._forward(input, weight, bias) + uncast_bias_weight(self, weight, bias, offload_stream) + return x def forward(self, input, *args, **kwargs): run_every_op() From 25de7b1bfa22dd98922f047a1342cc97f8e46c5b Mon Sep 17 00:00:00 2001 From: comfyanonymous <121283862+comfyanonymous@users.noreply.github.com> Date: Wed, 29 Oct 2025 14:20:27 -0700 Subject: [PATCH 5/5] Try to fix slow load issue on low ram hardware with pinned mem. (#10536) --- comfy/model_patcher.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/comfy/model_patcher.py b/comfy/model_patcher.py index 119119e96aec..74b9e48bc241 100644 --- a/comfy/model_patcher.py +++ b/comfy/model_patcher.py @@ -658,6 +658,7 @@ def load(self, device_to=None, lowvram_model_memory=0, force_patch_weights=False loading = self._load_list() load_completely = [] + offloaded = [] loading.sort(reverse=True) for x in loading: n = x[1] @@ -699,8 +700,7 @@ def load(self, device_to=None, lowvram_model_memory=0, force_patch_weights=False patch_counter += 1 cast_weight = True - for param in params: - self.pin_weight_to_device("{}.{}".format(n, param)) + offloaded.append((module_mem, n, m, params)) else: if hasattr(m, "comfy_cast_weights"): wipe_lowvram_weight(m) @@ -741,6 +741,12 @@ def load(self, device_to=None, lowvram_model_memory=0, force_patch_weights=False for x in load_completely: x[2].to(device_to) + for x in offloaded: + n = x[1] + params = x[3] + for param in params: + self.pin_weight_to_device("{}.{}".format(n, param)) + if lowvram_counter > 0: logging.info("loaded partially {} {} {}".format(lowvram_model_memory / (1024 * 1024), mem_counter / (1024 * 1024), patch_counter)) self.model.model_lowvram = True