以下是请求OPENAI接口的java版本和python,增加请求超时重试3次的逻辑

java 版本

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Proxy;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import okhttp3.Call;
import okhttp3.Callback;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.Response;

public class OpenAIRequest {
    private static final String OPENAI_API_URL = "https://api.openai.com/v1/your_endpoint";
    private static final String API_KEY = "your_api_key";
    private static final int TIMEOUT = 10;
    private static final int RETRY_COUNT = 3;
    private static final int THREAD_POOL_SIZE = 10;

    private final OkHttpClient httpClient;
    private final ExecutorService executorService;

    public OpenAIRequest() {
        OkHttpClient.Builder builder = new OkHttpClient.Builder()
                .connectTimeout(TIMEOUT, TimeUnit.SECONDS)
                .writeTimeout(TIMEOUT, TimeUnit.SECONDS)
                .readTimeout(TIMEOUT, TimeUnit.SECONDS);

        // 设置代理
        Proxy proxy = new Proxy(Proxy.Type.HTTP, new InetSocketAddress("your_proxy_host", your_proxy_port));
        builder.proxy(proxy);

        httpClient = builder.build();
        executorService = Executors.newFixedThreadPool(THREAD_POOL_SIZE);
    }

    public void requestOpenAI() {
        executorService.submit(() -> {
            int retries = 0;
            boolean success = false;

            while (retries < RETRY_COUNT && !success) {
                try {
                    Request request = new Request.Builder()
                            .url(OPENAI_API_URL)
                            .header("Authorization", "Bearer " + API_KEY)
                            .build();

                    httpClient.newCall(request).enqueue(new Callback() {
                        @Override
                        public void onFailure(Call call, IOException e) {
                            System.out.println("请求失败: " + e.getMessage());
                        }

                        @Override
                        public void onResponse(Call call, Response response) throws IOException {
                            if (response.isSuccessful()) {
                                System.out.println("请求成功: " + response.body().string());
                                success = true;
                            } else {
                                System.out.println("请求失败: " + response.message());
                            }
                        }
                    });

                    success = true;
                } catch (Exception e) {
                    retries++;
                    System.out.println("重试次数: " + retries);
                }
            }
        });
    }

    public static void main(String[] args) {
        OpenAIRequest openAIRequest = new OpenAIRequest();
        openAIRequest.requestOpenAI();
    }
}

注意,需要替换`OPENAI_API_URL`、`API_KEY`、`your_proxy_host`和`your_proxy_port`为您的实际值。同时,需要添加OkHttp库作为依赖项。

<dependency>
    <groupId>com.squareup.okhttp3</groupId>
    <artifactId>okhttp</artifactId>
    <version>3.6.0</version>
</dependency>

Python 版本

import requests
from concurrent.futures import ThreadPoolExecutor, as_completed
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry

class OpenAIRequester:
    def __init__(self, api_key, proxy=None, max_workers=5):
        self.api_key = api_key
        self.proxy = proxy
        self.max_workers = max_workers
        self.session = self._create_session()

    def _create_session(self):
        session = requests.Session()
        if self.proxy:
            session.proxies.update(self.proxy)

        retries = Retry(total=3, backoff_factor=1, status_forcelist=[429, 500, 502, 503, 504])
        adapter = HTTPAdapter(max_retries=retries)
        session.mount('http://', adapter)
        session.mount('https://', adapter)

        return session

    def _request_openai(self, url, data):
        headers = {
            'Authorization': f'Bearer {self.api_key}',
            'Content-Type': 'application/json'
        }
        try:
            response = self.session.post(url, json=data, headers=headers, timeout=10)
            response.raise_for_status()
            return response.json()
        except requests.exceptions.RequestException as e:
            print(f"请求失败: {e}")
            return None

    def request(self, urls, data_list):
        with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
            futures = [executor.submit(self._request_openai, url, data) for url, data in zip(urls, data_list)]

            results = []
            for future in as_completed(futures):
                result = future.result()
                if result:
                    results.append(result)

        return results

# 使用示例
if __name__ == "__main__":
    api_key = "your_openai_api_key"
    proxy = {
        "http": "http://proxy.example.com:8080",
        "https": "https://proxy.example.com:8080"
    }
    requester = OpenAIRequester(api_key, proxy)

    urls = ["https://api.openai.com/v1/engines/davinci-codex/completions"] * 3
    data_list = [
        {
            "prompt": "Python code to calculate the factorial of a number",
            "max_tokens": 100,
            "n": 1,
            "stop": None,
            "temperature": 0.5,
        }
    ] * 3

    results = requester.request(urls, data_list)
    print(results)

将`your_openai_api_key`替换为您的OpenAI API密钥,并根据需要设置代理。这个类将使用线程池并发地请求OpenAI接口,并在遇到超时或其他错误时自动重试3次

Logo

开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!

更多推荐