Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
231 changes: 231 additions & 0 deletions docs/MatrixOne-Intelligence/workflow api/data_connect_api.md
Original file line number Diff line number Diff line change
Expand Up @@ -464,6 +464,10 @@ else:

### 创建载入任务

#### 连接器载入

通过已配置的连接器载入文件到数据卷。

```
POST /task
```
Expand Down Expand Up @@ -519,6 +523,233 @@ print(response.json())
{'code': 'OK', 'msg': 'OK'}
```

#### 本地上传载入

直接上传本地文件到数据卷,无需预先配置连接器。

```
POST /connectors/upload
```

**输入参数:**

| 参数 | 是否必填 |含义|
| --------------- | ------- |---- |
| file | 是 | 要上传的文件(multipart/form-data 格式) |
| VolumeID | 是 | 目标数据卷 ID |
| meta | 是 | 文件元数据(JSON 格式的数组) |

**meta 参数格式:**

| 参数 | 是否必填 |含义|
| --------------- | ------- |---- |
| file_name | 是 | 文件名称 |
| file_size | 是 | 文件大小(字节) |
| mime_type | 否 | 文件 MIME 类型,默认为 "application/octet-stream" |

**示例:**

```python
import requests
import json
import os
import mimetypes

def upload_file(file_path, volume_id, moi_key):
"""
上传单个文件到平台

参数:
file_path (str): 本地文件路径
volume_id (str): 目标数据卷 ID
moi_key (str): API 密钥
"""

# 验证文件是否存在
if not os.path.exists(file_path):
raise FileNotFoundError(f"文件不存在:{file_path}")

# 构建请求 URL
url = "https://freetier-01.cn-hangzhou.cluster.matrixonecloud.cn/connectors/upload"

# 构建请求头
headers = {
"Moi-Key": moi_key
}

# 获取文件信息
file_name = os.path.basename(file_path)
file_size = os.path.getsize(file_path)
mime_type, _ = mimetypes.guess_type(file_path)

# 准备元数据
meta = json.dumps([{
"file_name": file_name,
"file_size": file_size,
"mime_type": mime_type or "application/octet-stream"
}])

# 准备文件和表单数据
with open(file_path, 'rb') as f:
files = {
"file": (file_name, f, mime_type or "application/octet-stream")
}

data = {
"VolumeID": volume_id,
"meta": meta
}

# 发送 POST 请求
response = requests.post(url, headers=headers, files=files, data=data)

# 检查响应
response.raise_for_status()
return response.json()

# 使用示例
file_path = "/path/to/your/file.pdf"
volume_id = "1889578498228068352"
moi_key = "xxxxx"

result = upload_file(file_path, volume_id, moi_key)
print(result)
```

**批量上传示例:**

```python
import requests
import json
import os
from pathlib import Path

def batch_upload_files(file_paths, volume_id, moi_key):
"""
批量上传多个文件

参数:
file_paths (list): 文件路径列表
volume_id (str): 目标数据卷 ID
moi_key (str): API 密钥
"""
results = []

for file_path in file_paths:
try:
result = upload_file(file_path, volume_id, moi_key)
results.append({
'file_path': file_path,
'success': True,
'result': result
})
print(f"✅ 上传成功:{os.path.basename(file_path)}")
except Exception as e:
results.append({
'file_path': file_path,
'success': False,
'error': str(e)
})
print(f"❌ 上传失败:{os.path.basename(file_path)} - {e}")

return results

# 使用示例
file_paths = [
"/path/to/file1.pdf",
"/path/to/file2.txt",
"/path/to/file3.docx"
]
volume_id = "1889578498228068352"
moi_key = "xxxxx"

results = batch_upload_files(file_paths, volume_id, moi_key)
```

**目录上传示例:**

```python
import requests
import json
import os
from pathlib import Path

def upload_directory(directory_path, volume_id, file_extensions=None, recursive=True, moi_key="xxxxx"):
"""
上传目录中的文件

参数:
directory_path (str): 目录路径
volume_id (str): 目标数据卷 ID
file_extensions (list): 允许的文件扩展名列表,如 ['.txt', '.pdf']
recursive (bool): 是否递归子目录
moi_key (str): API 密钥
"""

if not os.path.exists(directory_path):
raise FileNotFoundError(f"目录不存在:{directory_path}")

# 收集要上传的文件
file_paths = []
directory = Path(directory_path)

pattern = "**/*" if recursive else "*"

for file_path in directory.glob(pattern):
if file_path.is_file():
# 检查文件扩展名
if file_extensions:
if file_path.suffix.lower() in [ext.lower() for ext in file_extensions]:
file_paths.append(str(file_path))
else:
file_paths.append(str(file_path))

if not file_paths:
print("❌ 没有找到符合条件的文件")
return []

print(f"📁 找到 {len(file_paths)} 个文件准备上传")

# 批量上传文件
return batch_upload_files(file_paths, volume_id, moi_key)

# 使用示例
directory_path = "/path/to/your/directory"
volume_id = "1889578498228068352"
file_extensions = ['.pdf', '.txt', '.docx'] # 只上传这些类型的文件
moi_key = "xxxxx"

results = upload_directory(
directory_path=directory_path,
volume_id=volume_id,
file_extensions=file_extensions,
recursive=True,
moi_key=moi_key
)
```

返回示例:

```json
{
"code": "OK",
"msg": "OK",
"data": {
"success": true,
"file_id": "1889613341347389440",
"task_id": "1889613340219121664",
"message": "文件上传成功",
"results": [
{
"success": true,
"file_id": "1889613341347389440",
"message": "上传成功"
}
]
}
}
```

### 载入任务列表

```
Expand Down
Loading