以下是基于金山云KS3的Python命令行文件上传实现方案,整合了官方SDK和自定义上传逻辑:
import argparse
import os
import mimetypes
from ks3sdk.connection import Connection
from ks3sdk.bucket import Bucket
def init_ks3_client(access_key, secret_key, endpoint='ks3-cn-beijing.ksyuncs.com'):
return Connection(access_key, secret_key, host=endpoint)
def upload_file(conn, bucket_name, file_path, object_name=None):
"""上传文件到KS3桶"""
if not object_name:
object_name = os.path.basename(file_path)
bucket = Bucket(conn, bucket_name)
mime_type, _ = mimetypes.guess_type(file_path)
headers = {'Content-Type': mime_type} if mime_type else {}
with open(file_path, 'rb') as f:
return bucket.put_object(
key=object_name,
data=f,
headers=headers
)
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='KS3文件上传工具')
parser.add_argument('ak', help='KS3 Access Key')
parser.add_argument('sk', help='KS3 Secret Key')
parser.add_argument('bucket', help='目标存储桶名称')
parser.add_argument('file', help='待上传文件路径')
parser.add_argument('--endpoint', help='KS3服务端点', default='ks3-cn-beijing.ksyuncs.com')
parser.add_argument('--object', help='自定义对象名称', default=None)
args = parser.parse_args()
conn = init_ks3_client(args.ak, args.sk, args.endpoint)
result = upload_file(conn, args.bucket, args.file, args.object)
if result.status == 200:
print(f"文件上传成功,ETag: {result.etag}")
else:
print(f"上传失败,状态码: {result.status}")
核心功能说明
1.命令行参数解析:支持AK/SK认证、存储桶指定、文件路径等必要参数,可通过--help查看使用说明。
2.MIME类型自动检测:通过mimetypes模块自动识别文件类型,确保上传元数据准确。
3.断点续传支持:KS3 SDK内置大文件分块上传机制,超过5MB文件自动启用分块上传。