1、事件起因

在这里插入图片描述

2、相关回复

在这里插入图片描述

3、七牛云简介

七牛云(Qiniu Cloud) 是一家中国的领先云服务提供商,成立于 2011 年,专注于为开发者和企业提供一站式的数据管理和服务解决方案。七牛云提供的服务涵盖对象存储、数据处理、内容分发加速、视频云、数据分析等多个领域,帮助用户应对海量数据存储和处理的需求。

3.1 主要服务

  1. 对象存储(Kodo)

    • 七牛云的对象存储服务(Kodo)是其核心产品之一。它允许用户上传和存储海量数据,如图片、视频、文档等,并提供高效的访问控制和数据管理工具。数据可以通过 HTTP/HTTPS 协议访问,适用于静态网站托管、应用数据存储、备份等场景。
  2. 内容分发网络(CDN)

    • 七牛云的 CDN 服务提供全球范围内的内容分发加速,帮助企业将静态和动态内容快速、安全地传递给全球用户。CDN 网络具备智能调度和缓存功能,可以有效提高网站和应用的响应速度,降低延迟。
  3. 数据处理

    • 七牛云提供了一系列数据处理服务,包括图像处理(如缩放、裁剪、格式转换)、视频处理(如转码、拼接、截图)、文件转换(如 PDF 转换)等。用户可以通过 API 接口实时调用这些功能,满足各种复杂的业务需求。
  4. 视频云

    • 七牛云的视频云服务支持音视频的直播、点播和处理。它提供从媒体上传、转码、存储、分发到播放的一站式解决方案,并具备直播转码、录制、实时剪辑、AI 智能识别等功能,适合视频媒体、教育、娱乐等行业。
  5. AI 和数据分析

    • 七牛云还为企业提供了丰富的 AI 和数据分析服务,例如图像识别、自然语言处理、大数据分析、物联网数据处理等。结合七牛的云存储和计算能力,用户可以轻松地进行海量数据的智能处理和分析。
  6. 安全与合规

    • 七牛云提供了全面的安全防护服务,包括 DDoS 防护、Web 应用防火墙(WAF)、数据加密、访问控制等功能,确保用户数据的安全性和隐私性。此外,七牛云遵循国际化的合规标准,帮助企业应对全球各地的合规需求。

3.2 七牛云的优势

  1. 高可用性与可靠性

    • 七牛云的数据存储和服务拥有多重备份机制,确保数据的高可用性和持久性。其 CDN 网络覆盖全球,保证服务的可靠交付。
  2. 强大的 API 支持

    • 七牛云的所有服务都提供丰富的 API 接口,开发者可以方便地集成其服务到自己的应用或系统中,实现自动化管理和数据处理。
  3. 灵活的扩展性

    • 七牛云支持弹性扩展,无论是个人开发者的小型项目,还是企业级的大型应用,都可以根据需求动态调整资源,适应业务的发展。
  4. 完善的开发者支持

    • 七牛云为开发者提供了详细的文档、SDK、示例代码和技术支持,帮助他们快速上手并高效使用云服务。七牛云支持多种编程语言,包括 Python、Java、Go、PHP、Node.js 等。

3.3 常见应用场景

  • 媒体与娱乐:视频直播与点播、视频编辑与发布、图片处理与存储。
  • 电商与零售:商品图片存储与处理、促销视频分发、海量用户数据管理。
  • 在线教育:课件视频的录制、存储与分发,在线课堂直播、数据分析等。
  • 智能设备:物联网数据的收集、存储与分析,智能设备的远程控制与管理。

3.4 总结

七牛云作为一家综合性的云服务平台,通过其强大的数据存储和处理能力、全球内容分发加速以及丰富的开发者支持,成为众多企业和开发者的首选。它广泛应用于媒体、教育、电商、金融等行业,帮助企业应对大规模数据处理、存储及内容分发的挑战。

4、数美简介

数美是一家专注于内容安全与合规审核技术的公司,成立于中国,致力于为各类企业提供智能化的内容审核和风控服务。数美主要通过人工智能技术为互联网平台、社交媒体、电商、广告等领域提供专业的内容审核解决方案,确保平台的内容安全、合规,并维护良好的用户体验。

4.1 核心服务

  1. 内容安全审核

    • 数美通过先进的机器学习和自然语言处理技术,提供对图片、视频、文本、音频等多媒体内容的审核服务,识别并过滤非法、不良或敏感内容,如暴力、色情、涉政、仇恨言论等。其审核系统可以实时处理大量内容,确保内容的及时性和安全性。
  2. AI 智能识别

    • 数美的信息审核系统基于深度学习和计算机视觉等 AI 技术,能够识别图片和视频中的违法元素,文本中的敏感词汇,甚至音频中的敏感语句。它结合大规模训练数据和不断迭代的算法,有效提升内容审核的精度和覆盖面。
  3. 内容风控

    • 除了审核不良内容,数美还提供内容风控服务,帮助企业监控和应对内容风险。通过数据分析和智能风控引擎,预警潜在风险内容,并提供相应的合规建议,帮助企业降低运营风险。
  4. 定制化服务

    • 数美为不同行业提供定制化的内容审核解决方案。无论是社交媒体平台、电商网站、广告投放平台,还是教育、游戏等垂直领域,数美都能够根据行业特点和法规要求,定制适合的内容审核策略与风控模型。
  5. 人工与 AI 结合

    • 在数美的内容审核服务中,AI 自动审核和人工复核相结合。对于复杂或边界模糊的内容,AI 系统将自动筛选出可能存在问题的内容,并交由人工审核员进行进一步复核,从而保证审核的准确性和可靠性。

4.2 技术优势

  1. 高效的内容审核引擎

    • 数美依靠其高效的算法和分布式架构,能够支持海量数据的实时处理,适用于内容高频更新的大型互联网平台。同时,审核系统可根据不同内容类别进行分类处理,提升审核的效率与精度。
  2. 多媒体审核覆盖

    • 数美不仅支持对文本内容的审核,还包括图片、视频和音频等多媒体内容的审核,适用于各类场景,如社交平台的用户生成内容(UGC)审核、电商平台的商品图片审核、广告内容的合规性审核等。
  3. 高准确率与低误判率

    • 得益于深度学习模型的不断优化与海量数据的训练,数美的 AI 审核系统在内容审核中的识别准确率非常高,同时误判率低。这对需要大规模审核内容的平台尤为重要,有效降低人工审核成本的同时保证合规性。
  4. 灵活的部署方式

    • 数美的信息审核服务可以根据客户的需求灵活部署,支持公有云、私有云、以及本地化部署。企业可以根据自身需求和合规要求,选择适合的部署方案。

4.3 常见应用场景

  1. 社交媒体:社交平台需要对用户上传的文字、图片、视频等内容进行严格审核,以防止不良内容的传播。数美的审核系统帮助这些平台实现内容合规和社区安全管理。

  2. 电商平台:商品描述、图片等内容审核是电商平台面临的挑战。数美可以帮助电商平台审核商品是否包含违规内容,确保广告合规性及用户体验。

  3. 在线广告:广告审核是一个敏感领域,广告内容需要符合国家法律法规和道德标准。数美帮助广告投放平台在大规模投放前快速审核广告的合规性。

  4. 视频与直播平台:视频与直播平台面临大量实时内容的审核需求。数美的审核系统能够实时检测和过滤违规内容,确保平台的合规运营。

4.4 总结

科技作为内容安全和合规领域的专家,依靠先进的人工智能技术,提供全面的内容审核和风险控制服务。通过与平台的无缝集成,数美帮助企业在保障内容合规和提升用户体验的同时,减少违规内容带来的法律和声誉风险。

5、我能做什么

使用七牛和数美来实现图片的合规审核,可以分为两步:首先将图片上传到七牛云,然后使用数美的内容审核服务对图片进行合规性检测

当然其他内容格式也是大同小异,比如文本,音频,视频==,不局限于图片

5.1. 七牛云图片上传

首先使用七牛云 SDK 将图片上传到存储空间中。你可以参考之前的七牛上传代码,将图片上传至七牛云。

5.2. 调用数美审核图片

上传完成后,通过调用数美的内容审核 API,对图片进行合规性审核。数美的审核服务通常会包括对色情、暴力、政治敏感内容等的检测。

5.3 步骤详解

5.3.1 安装依赖库

确保安装了七牛云的 SDK 和 HTTP 请求库(如 requests)。

pip install qiniu requests
5.3.2 上传图片到七牛云

使用 put_file 函数,将图片上传到七牛云。

5.3.3 调用数美内容审核 API

假设你已经获得了数美的 API 密钥,可以使用 requests 库来调用审核接口。一般审核的步骤如下:

  • 获取图片的 URL(存储在七牛云上)。
  • 使用 HTTP 请求发送图片 URL 给数美审核 API。
  • 获取审核结果,判断图片是否合规。

5.4 代码示例

import os
import requests
from qiniu import Auth, put_file
from PIL import Image
from io import BytesIO

# 上传图片到七牛云
def upload_file_to_qiniu(file_path, bucket_name, file_key):
    # 七牛云配置
    access_key = 'your_access_key'
    secret_key = 'your_secret_key'
    q = Auth(access_key, secret_key)

    # 获取上传Token
    token = q.upload_token(bucket_name, file_key, 3600)

    # 上传文件
    ret, info = put_file(token, file_key, file_path)

    # 检查上传结果
    if info.status_code == 200:
        return f"http://{bucket_name}.your_domain.com/{file_key}"
    else:
        print(f"上传失败: {info}")
        return None

# 调用数美审核API
def review_image_with_meisu(image_url):
    api_url = "https://api.meisuapi.com/v1/image/review"
    headers = {
        "Authorization": "Bearer your_meisu_api_key",  # 替换为你的数美API密钥
        "Content-Type": "application/json"
    }
    payload = {
        "url": image_url,
        "type": "porn,violence,politics"  # 可根据需要调整审核类型
    }

    response = requests.post(api_url, json=payload, headers=headers)

    if response.status_code == 200:
        review_data = response.json()
        if review_data["code"] == 0:
            return review_data["data"]
        else:
            print(f"审核失败: {review_data}")
            return None
    else:
        print(f"HTTP 请求失败: {response.status_code}")
        return None

# 示例调用
bucket_name = "your_bucket_name"
file_path = "path_to_your_image.jpg"
file_key = os.path.basename(file_path)

# 上传图片到七牛云
image_url = upload_file_to_qiniu(file_path, bucket_name, file_key)

if image_url:
    print(f"图片已上传到七牛云: {image_url}")
    
    # 调用数美进行内容审核
    review_result = review_image_with_meisu(image_url)
    
    if review_result:
        print("审核结果: ", review_result)
    else:
        print("审核失败")
else:
    print("图片上传失败")

5.5 代码说明

  1. upload_file_to_qiniu:上传图片到七牛云,返回图片的 URL。
  2. review_image_with_meisu:将图片 URL 发送给数美的审核接口,审核内容类型可以根据需求定制(例如色情、暴力、政治等)。
  3. API 密钥管理:请确保使用安全的方式存储和管理七牛云和数美的 API 密钥。

5.6 审核结果处理

审核结果通常会返回一组包含审核状态、内容类别以及得分的 JSON 数据。你可以根据这些结果来判断图片是否合规,并采取相应的措施(如阻止不合规图片的使用,或者将不合规图片移动到专门的文件夹中)。

5.7 提示

  • 七牛和数美的 API 请求都有速率限制,因此在大量图片审核时,请注意控制请求频率,或者结合多线程、异步处理进行优化。
  • 图片 URL 必须是公开的才能让数美的审核服务访问到,因此需要确保图片 URL 没有访问权限问题。

通过将图片存储与内容审核服务结合,你可以构建一个自动化的图片合规检测系统。

6、项目demo实现

import requests
import json
import os
import shutil
import time
import pandas as pd
from qiniu import put_file
from concurrent.futures import ThreadPoolExecutor
from PIL import Image
from io import BytesIO

# 定义一个空的列表,用于存储每个图片的处理时间数据
timing_data = []

def compress_image(file_path, max_size=256 * 1024, quality=50):
    """
    压缩图片,保持文件大小不超过 max_size(字节)。
    使用 Pillow 库进行压缩。
    """
    # 提前检查文件大小,避免不必要的操作
    file_size = os.path.getsize(file_path)
    
    if file_size <= max_size:
        # 如果文件大小已经小于 max_size,直接返回原路径
        return file_path

    # 打开图片
    img = Image.open(file_path)
    
    # 检查是否是 RGBA 模式,如果是则转换为 RGB 模式
    if img.mode == 'RGBA':
        img = img.convert('RGB')
    
    output = BytesIO()

    # 初始压缩
    img.save(output, format="JPEG", quality=quality)
    output_size = output.tell()

    # 调整质量以继续压缩,直到文件大小小于 max_size
    while output_size > max_size and quality > 10:
        quality -= 5
        output = BytesIO()  # 重置 BytesIO 对象
        img.save(output, format="JPEG", quality=quality)
        output_size = output.tell()

    # 保存压缩后的图片到临时路径
    temp_compressed_path = file_path + ".compressed.jpg"
    with open(temp_compressed_path, "wb") as f:
        f.write(output.getvalue())

    return temp_compressed_path

def get_upload_token(fileName="test.txt"):
    url = "替换实际七牛URL"
    headers = {'Content-Type': 'application/json'}

    data = {
        "appKey": "app-screenshot-review",
        "fileName": fileName
    }

    try:
        # 发送 POST 请求
        response = requests.post(url, headers=headers, data=json.dumps(data))
        response.raise_for_status()  # 会自动抛出 HTTPError 异常

        # 解析响应
        response_data = response.json()
        data = response_data.get('data', {})
        cached_token = data.get('token')
        cached_key = data.get('resourceName')

        if cached_token and cached_key:
            return cached_token, cached_key
        else:
            print("Error: Upload token or key is missing in the response.")
            return "", ""
    except requests.RequestException as e:
        print(f"RequestException: {e}")
        return "", ""
    except json.JSONDecodeError:
        print("Error: Failed to decode JSON response.")
        return "", ""

def upload_file(upload_token, key, filePath):
    try:
        # 上传文件
        ret, info = put_file(up_token=upload_token, key=key, file_path=filePath, version='v2')

        # 检查上传是否成功
        if info.status_code == 200:
            # 获取文件 URL
            file_url = ret.get('data', {}).get('url')
            if file_url:
                # 将 URL 从 HTTPS 转换为 HTTP
                return file_url.replace("https://", "http://")
            else:
                print("Error: 'url' not found in the response data.")
                return ""
        else:
            print(f"Error: Upload failed with status code {info.status_code}.")
            return ""
    except Exception as e:
        print(f"Exception during file upload: {e}")
        return ""

def check_pic_pass(file_url):
    url = "替换实际数美URL"
    # 构造请求参数
    data = {
        "account": "servertest",
        "packageName": "com.xxx.xxx",
        "imageUrl": file_url
    }
    
    try:
        response = requests.post(url, data=data)
        if response.status_code == 200:
            response_data = response.json()
            print(f"Response data: {response_data}")
            reviewMsg = response_data.get('data', {}).get('reviewMsg')
            return reviewMsg
        else:
            print("Failed to call API. Status code:%s", response.status_code)
        return ""
        
    except Exception as e:
        print("Exception:", e)
        return ""

def process_single_image(file_path, error_folder):
    filename = os.path.basename(file_path)
    
    # 记录开始时间
    start_time = time.time()
    
    # 记录压缩图片的开始时间
    compress_start_time = time.time()
    
    # 压缩大于1MB的图片
    file_path = compress_image(file_path)
    
    # 记录压缩图片的结束时间并打印耗时
    compress_end_time = time.time()
    compress_time = compress_end_time - compress_start_time
    print(f"Step 0 - 压缩图片'{filename}': {compress_time:.2f} 秒")

    # 1、获取上传 Token 和 Key
    token_start_time = time.time()
    token, key = get_upload_token(file_path)
    token_end_time = time.time()
    token_time = token_end_time - token_start_time
    print(f"Step 1 - 获取上传 Token 和 Key'{filename}': {token_time:.2f} 秒")
    
    # 2、上传图片
    upload_start_time = time.time()
    file_url = upload_file(upload_token=token, key=key, filePath=file_path)
    upload_end_time = time.time()
    upload_time = upload_end_time - upload_start_time
    print(f"Step 2 - 上传图片'{filename}': {upload_time:.2f} 秒")
    
    # 3、检查图片
    check_start_time = time.time()
    review_msg = check_pic_pass(file_url)
    check_end_time = time.time()
    check_time = check_end_time - check_start_time
    print(f"Step 3 - 检查图片'{filename}': {check_time:.2f} 秒")
    
    # 4、处理异常图片
    error_check_start_time = time.time()
    if review_msg is not None and '正常' not in review_msg:
        shutil.copy(file_path, os.path.join(error_folder, filename))
    error_check_end_time = time.time()
    error_check_time = error_check_end_time - error_check_start_time
    print(f"Step 4 - 处理异常图片'{filename}': {error_check_time:.2f} 秒")
    
    # 删除临时压缩文件(如果存在)
    if file_path.endswith(".compressed.jpg"):
        os.remove(file_path)
    
    # 计算总时间
    total_time = time.time() - start_time
    print(f"Total time for processing '{filename}': {total_time:.2f} 秒")
    
    # 将每个阶段的时间数据保存到列表中
    timing_data.append({
        '文件名': filename,
        '压缩耗时(秒)': compress_time,
        '获取Token耗时(秒)': token_time,
        '上传耗时(秒)': upload_time,
        '检查耗时(秒)': check_time,
        '处理异常图片耗时(秒)': error_check_time,
        '总耗时(秒)': total_time
    })
    
    # 返回检测结果用于后续的保存
    return {'图片名': filename, '检测结果': review_msg}

# 在所有图片处理完成后,将 timing_data 保存到一个 Excel 文件
def save_timing_data_to_excel(output_excel_name="timing_data.xlsx", image_folder=None):
    if image_folder is None:
        # 如果未指定图片文件夹,使用当前工作目录
        image_folder = os.getcwd()
    
    # 生成保存路径,将 Excel 文件保存到图片文件夹中
    output_excel_path = os.path.join(image_folder, output_excel_name)
    
    # 将 timing_data 保存为 Excel 文件
    df = pd.DataFrame(timing_data)
    df.to_excel(output_excel_path, index=False, engine='openpyxl')
    print(f"Timing data saved to {output_excel_path}")

def process_images_in_folder(input_folder, error_folder, result_file, max_workers=10):
    if not os.path.exists(error_folder):
        os.makedirs(error_folder)

    data = []
    images = [os.path.join(input_folder, filename) for filename in os.listdir(input_folder) 
              if filename.lower().endswith(('.png', '.jpg', '.jpeg'))]

    # 使用 ThreadPoolExecutor 进行并发处理
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        futures = [executor.submit(process_single_image, file_path, error_folder) for file_path in images]
        
        # 收集并保存结果
        for future in futures:
            result = future.result()
            data.append(result)

    df = pd.DataFrame(data)
    df.to_excel(result_file, index=False)
    
    save_timing_data_to_excel("timing_data.xlsx", input_folder)

def process_images_in_folder_(input_folder, error_folder, result_file):
    if not os.path.exists(error_folder):
        os.makedirs(error_folder)

    data = []

    # 遍历文件夹中的所有图片
    for filename in os.listdir(input_folder):
        if filename.lower().endswith(('.png', '.jpg',

 '.jpeg')):
            file_path = os.path.join(input_folder, filename)
            
            # 1、获取上传 Token 和 Key
            token, key = get_upload_token(file_path)
            
            # 2、上传图片
            file_url = upload_file(upload_token=token, key=key, filePath=file_path)
            
            # 3、检查图片
            review_msg = check_pic_pass(file_url)
            
            # 4、保存检测结果
            data.append({
                '图片名': filename,
                '检测结果': review_msg
            })

            # 如果有异常,将图片拷贝到错误文件夹
            if review_msg is not None and '正常' not in review_msg:
                shutil.copy(file_path, os.path.join(error_folder, filename))

    # 将检测结果保存到 Excel 文件
    df = pd.DataFrame(data)
    df.to_excel(result_file, index=False)

# 检查指定目录下的图片,并将异常图片移动到错误目录,同时生成检测结果的 Excel 文件
if __name__ == "__main__":
    # 定义输入文件夹路径,其中包含要处理的图片
    input_folder = 'F:\\Work\\content-check\\result\\文本测试'
    # 定义错误文件夹路径,异常图片将被移动到这里
    error_folder = 'F:\\Work\\content-check\\result\\文本测试\\异常'
    # 定义结果文件路径,将生成一个包含检测结果的 Excel 文件
    result_file = 'F:\\Work\\content-check\\result\\文本测试\\检测结果.xlsx'
    
    # 调用函数,对指定文件夹中的图片进行处理
    process_images_in_folder(input_folder, error_folder, result_file, 8)

6.1 功能介绍

脚本结合了七牛云和内容审核服务,用于批量处理图片并检查图片的合规性。脚本的基本流程如下:

  1. 图片压缩:如果图片大于 256KB,使用 Pillow 库进行压缩,确保文件大小不超过限制。
  2. 上传图片:通过七牛云 API 获取上传 token 并上传图片,返回图片 URL。
  3. 审核图片:使用内容审核服务(数美)对上传后的图片进行合规性检查。
  4. 处理异常图片:如果图片未通过审核,则将其复制到指定的错误文件夹。
  5. 记录与保存数据:保存处理时间数据到 Excel 文件,并生成检测结果的 Excel 文件。

此流程还通过多线程 (ThreadPoolExecutor) 来并发处理图片,提高效率。以下是主要功能的简要概述:

6.2 核心功能

  • 压缩图片 (compress_image):通过降低 JPEG 质量压缩图片大小,直到符合限制。
  • 获取上传 Token (get_upload_token):调用 API 获取七牛云的上传 Token。
  • 上传文件 (upload_file):使用七牛云 SDK 将图片上传到云端。
  • 图片审核 (check_pic_pass):调用数美 API 对上传的图片进行合规性审核,返回审核结果。
  • 批量处理 (process_images_in_folder):对文件夹中的图片进行并发处理,记录并保存处理时间及审核结果。

6.3 优化与并发

  • 使用 ThreadPoolExecutor 实现并发处理,能够显著提升处理速度,特别是在大量图片的场景下。
  • 每张图片的处理时间数据被记录并保存在 Excel 文件中,方便后续分析与优化。

6.4 测试过程

6.4.1 处理数据【130+张图】

在这里插入图片描述

6.4.2 合规检测结果【部分】

在这里插入图片描述

6.5 上传图片耗时优化【图片压缩】

6.5.1 上传图片耗时优化前【部分】

在这里插入图片描述

6.5.2 上传图片耗时优化后【部分】

在这里插入图片描述

6.5.3 上传图片耗时优化前后对比

在这里插入图片描述
在这里插入图片描述

优化【图片先压缩,再上传服务器】后成果:优化前12分钟+,优化后再6分钟之内,效率提升至少50%

此脚本非常适合在内容合规审核项目中使用,尤其是需要对大量图片进行批量处理的场景。如果你需要进一步优化,使用更高效的策略。如果有任何问题或需要进一步改进,欢迎留言指正和交流,感谢!


欢迎点赞|关注|收藏|评论,您的肯定是我创作的动力

欢迎点赞|关注|收藏|评论,您的肯定是我创作的动力

在这里插入图片描述

Logo

开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!

更多推荐