fix: stabilize Codex continuation and proxy retry recovery / 修复 Codex 续传与代理重试稳定性 (#257)

* wip: preserve local changes before upstream merge

* Improve proxy routing stability and add stable-first strategy

* Sync explicit-group routing strategy to eligible source routes

* Fix proxy retry and runtime health review issues

* fix: make proxy channel retry attempts configurable

* fix: harden proxy retry recovery and bookkeeping

* fix: address pr 257 review comments

---------

Co-authored-by: Cita <juricek.chen@gmail.com>
This commit is contained in:
Bainily
2026-03-23 19:17:22 +08:00
committed by GitHub
parent 5be5e2eb07
commit 1449f08e6c
36 changed files with 1802 additions and 656 deletions
+1
View File
@@ -108,6 +108,7 @@ export function buildConfig(env: NodeJS.ProcessEnv) {
requestBodyLimit: DEFAULT_REQUEST_BODY_LIMIT,
routingFallbackUnitCost: Math.max(1e-6, parseNumber(env.ROUTING_FALLBACK_UNIT_COST, 1)),
tokenRouterCacheTtlMs: Math.max(100, Math.trunc(parseNumber(env.TOKEN_ROUTER_CACHE_TTL_MS, 1_500))),
proxyMaxChannelAttempts: Math.max(1, Math.trunc(parseNumber(env.PROXY_MAX_CHANNEL_ATTEMPTS, 3))),
proxyLogRetentionDays: Math.max(0, Math.trunc(parseNumber(env.PROXY_LOG_RETENTION_DAYS, 30))),
proxyLogRetentionPruneIntervalMinutes: Math.max(1, Math.trunc(parseNumber(env.PROXY_LOG_RETENTION_PRUNE_INTERVAL_MINUTES, 30))),
proxyFileRetentionDays: Math.max(0, Math.trunc(parseNumber(env.PROXY_FILE_RETENTION_DAYS, 30))),
+23 -8
View File
@@ -41,6 +41,7 @@ import {
} from '../../routes/proxy/geminiCliCompat.js';
import { summarizeConversationFileInputsInOpenAiBody } from '../capabilities/conversationFileCapabilities.js';
import { detectDownstreamClientContext } from '../../routes/proxy/downstreamClientContext.js';
import { canRetryProxyChannel, getProxyMaxChannelRetries } from '../../services/proxyChannelRetry.js';
import {
createSurfaceFailureToolkit,
createSurfaceDispatchRequest,
@@ -49,8 +50,6 @@ import {
trySurfaceOauthRefreshRecovery,
} from './sharedSurface.js';
const MAX_RETRIES = 2;
function isRecord(value: unknown): value is Record<string, unknown> {
return !!value && typeof value === 'object' && !Array.isArray(value);
}
@@ -59,6 +58,17 @@ function asTrimmedString(value: unknown): string {
return typeof value === 'string' ? value.trim() : '';
}
function prioritizeEndpointCandidate(
candidates: Array<'chat' | 'messages' | 'responses'>,
preferred: 'chat' | 'messages' | 'responses',
): Array<'chat' | 'messages' | 'responses'> {
if (!candidates.includes(preferred)) return candidates;
return [
preferred,
...candidates.filter((candidate) => candidate !== preferred),
];
}
export async function handleChatSurfaceRequest(
request: FastifyRequest,
reply: FastifyReply,
@@ -108,10 +118,11 @@ export async function handleChatSurfaceRequest(
proxyToken: getProxyAuthContext(request)?.token || null,
});
const downstreamApiKeyId = getProxyAuthContext(request)?.keyId ?? null;
const maxRetries = getProxyMaxChannelRetries();
const failureToolkit = createSurfaceFailureToolkit({
warningScope: 'chat',
downstreamPath,
maxRetries: MAX_RETRIES,
maxRetries,
clientContext,
downstreamApiKeyId,
});
@@ -119,7 +130,7 @@ export async function handleChatSurfaceRequest(
const excludeChannelIds: number[] = [];
let retryCount = 0;
while (retryCount <= MAX_RETRIES) {
while (retryCount <= maxRetries) {
const selected = await selectSurfaceChannelForAttempt({
requestedModel,
downstreamPolicy,
@@ -142,7 +153,7 @@ export async function handleChatSurfaceRequest(
const modelName = selected.actualModel || requestedModel;
const oauth = getOauthInfoFromAccount(selected.account);
const isCodexSite = String(selected.site.platform || '').trim().toLowerCase() === 'codex';
const endpointCandidates = [
let endpointCandidates = [
...await resolveUpstreamEndpointCandidates(
{
site: selected.site,
@@ -157,6 +168,9 @@ export async function handleChatSurfaceRequest(
},
),
];
if (oauth?.provider === 'codex' && downstreamFormat === 'openai') {
endpointCandidates = prioritizeEndpointCandidate(endpointCandidates, 'responses');
}
const endpointRuntimeContext = {
siteId: selected.site.id,
modelName,
@@ -624,17 +638,18 @@ export async function handleClaudeCountTokensSurfaceRequest(
});
const downstreamPolicy = getDownstreamRoutingPolicy(request);
const downstreamApiKeyId = getProxyAuthContext(request)?.keyId ?? null;
const maxRetries = getProxyMaxChannelRetries();
const failureToolkit = createSurfaceFailureToolkit({
warningScope: 'chat',
downstreamPath,
maxRetries: MAX_RETRIES,
maxRetries,
clientContext,
downstreamApiKeyId,
});
const excludeChannelIds: number[] = [];
let retryCount = 0;
while (retryCount <= MAX_RETRIES) {
while (retryCount <= maxRetries) {
const selected = await selectSurfaceChannelForAttempt({
requestedModel,
downstreamPolicy,
@@ -664,7 +679,7 @@ export async function handleClaudeCountTokensSurfaceRequest(
requestedModel,
);
if (!endpointCandidates.includes('messages')) {
if (retryCount < MAX_RETRIES) {
if (canRetryProxyChannel(retryCount)) {
retryCount += 1;
continue;
}
+28 -15
View File
@@ -36,8 +36,7 @@ import { detectDownstreamClientContext, type DownstreamClientContext } from '../
import { insertProxyLog } from '../../services/proxyLogStore.js';
import { summarizeConversationFileInputsInOpenAiBody } from '../capabilities/conversationFileCapabilities.js';
import { readRuntimeResponseText } from '../executors/types.js';
const MAX_RETRIES = 2;
import { canRetryProxyChannel, getProxyMaxChannelRetries } from '../../services/proxyChannelRetry.js';
const GEMINI_MODEL_PROBES = [
'gemini-2.5-flash',
'gemini-2.0-flash',
@@ -253,6 +252,18 @@ async function logProxy(
}
}
async function recordGeminiChannelSuccessBestEffort(
channelId: number,
latencyMs: number,
modelName: string,
): Promise<void> {
try {
await tokenRouter.recordSuccess?.(channelId, latencyMs, 0, modelName);
} catch (error) {
console.warn('[proxy/gemini] failed to record channel success', error);
}
}
export async function geminiProxyRoute(app: FastifyInstance) {
const listModels = async (request: FastifyRequest, reply: FastifyReply) => {
const apiVersion = geminiGenerateContentTransformer.resolveProxyApiVersion(
@@ -264,7 +275,7 @@ export async function geminiProxyRoute(app: FastifyInstance) {
let lastText = 'No available channels for Gemini models';
let lastContentType = 'application/json';
while (retryCount <= MAX_RETRIES) {
while (retryCount <= getProxyMaxChannelRetries()) {
const selected = retryCount === 0
? await selectGeminiChannel(request)
: await selectNextGeminiProbeChannel(request, excludeChannelIds);
@@ -305,10 +316,11 @@ export async function geminiProxyRoute(app: FastifyInstance) {
status: upstream.status,
errorText: text,
});
if (retryCount < MAX_RETRIES) {
if (canRetryProxyChannel(retryCount)) {
retryCount += 1;
continue;
}
return reply.code(lastStatus).type(lastContentType).send(lastText);
}
try {
@@ -330,10 +342,11 @@ export async function geminiProxyRoute(app: FastifyInstance) {
type: 'upstream_error',
},
});
if (retryCount < MAX_RETRIES) {
if (canRetryProxyChannel(retryCount)) {
retryCount += 1;
continue;
}
return reply.code(lastStatus).type(lastContentType).send(lastText);
}
}
};
@@ -392,7 +405,7 @@ export async function geminiProxyRoute(app: FastifyInstance) {
let lastText = 'No available channels for this model';
let lastContentType = 'application/json';
while (retryCount <= MAX_RETRIES) {
while (retryCount <= getProxyMaxChannelRetries()) {
const selected = retryCount === 0
? await tokenRouter.selectChannel(requestedModel, policy)
: await tokenRouter.selectNextChannel(requestedModel, excludeChannelIds, policy);
@@ -429,7 +442,7 @@ export async function geminiProxyRoute(app: FastifyInstance) {
status: 500,
errorText: 'Gemini CLI OAuth project is missing',
});
if (retryCount < MAX_RETRIES) {
if (canRetryProxyChannel(retryCount)) {
retryCount += 1;
continue;
}
@@ -550,7 +563,7 @@ export async function geminiProxyRoute(app: FastifyInstance) {
upstreamPath,
clientContext,
);
if (retryCount < MAX_RETRIES) {
if (canRetryProxyChannel(retryCount)) {
retryCount += 1;
continue;
}
@@ -572,7 +585,7 @@ export async function geminiProxyRoute(app: FastifyInstance) {
: upstreamReader;
if (!reader) {
const latency = Date.now() - startTime;
await tokenRouter.recordSuccess?.(selected.channel.id, latency, 0);
await recordGeminiChannelSuccessBestEffort(selected.channel.id, latency, actualModel);
await logProxy(
selected,
requestedModel,
@@ -622,7 +635,7 @@ export async function geminiProxyRoute(app: FastifyInstance) {
}
const parsedUsage = parseProxyUsage(aggregateState);
const latency = Date.now() - startTime;
await tokenRouter.recordSuccess?.(selected.channel.id, latency, 0);
await recordGeminiChannelSuccessBestEffort(selected.channel.id, latency, actualModel);
await logProxy(
selected,
requestedModel,
@@ -658,7 +671,7 @@ export async function geminiProxyRoute(app: FastifyInstance) {
);
parsedUsage = parseProxyUsage(aggregateState);
const latency = Date.now() - startTime;
await tokenRouter.recordSuccess?.(selected.channel.id, latency, 0);
await recordGeminiChannelSuccessBestEffort(selected.channel.id, latency, actualModel);
await logProxy(
selected,
requestedModel,
@@ -681,7 +694,7 @@ export async function geminiProxyRoute(app: FastifyInstance) {
);
} catch {
const latency = Date.now() - startTime;
await tokenRouter.recordSuccess?.(selected.channel.id, latency, 0);
await recordGeminiChannelSuccessBestEffort(selected.channel.id, latency, actualModel);
await logProxy(
selected,
requestedModel,
@@ -843,7 +856,7 @@ export async function geminiProxyRoute(app: FastifyInstance) {
null,
clientContext,
);
if (retryCount < MAX_RETRIES) {
if (canRetryProxyChannel(retryCount)) {
retryCount += 1;
continue;
}
@@ -868,7 +881,7 @@ export async function geminiProxyRoute(app: FastifyInstance) {
},
});
const latency = Date.now() - startTime;
await tokenRouter.recordSuccess?.(selected.channel.id, latency, 0);
await recordGeminiChannelSuccessBestEffort(selected.channel.id, latency, actualModel);
await logProxy(
selected,
requestedModel,
@@ -920,7 +933,7 @@ export async function geminiProxyRoute(app: FastifyInstance) {
upstreamPath || null,
clientContext,
);
if (retryCount < MAX_RETRIES) {
if (canRetryProxyChannel(retryCount)) {
retryCount += 1;
continue;
}
@@ -38,6 +38,7 @@ import {
summarizeConversationFileInputsInResponsesBody,
} from '../capabilities/conversationFileCapabilities.js';
import { detectDownstreamClientContext } from '../../routes/proxy/downstreamClientContext.js';
import { getProxyMaxChannelRetries } from '../../services/proxyChannelRetry.js';
import {
createSurfaceFailureToolkit,
createSurfaceDispatchRequest,
@@ -46,8 +47,6 @@ import {
trySurfaceOauthRefreshRecovery,
} from './sharedSurface.js';
const MAX_RETRIES = 2;
function isRecord(value: unknown): value is Record<string, unknown> {
return !!value && typeof value === 'object';
}
@@ -178,17 +177,18 @@ export async function handleOpenAiResponsesSurfaceRequest(
if (!await ensureModelAllowedForDownstreamKey(request, reply, requestedModel)) return;
const downstreamPolicy = getDownstreamRoutingPolicy(request);
const downstreamApiKeyId = getProxyAuthContext(request)?.keyId ?? null;
const maxRetries = getProxyMaxChannelRetries();
const failureToolkit = createSurfaceFailureToolkit({
warningScope: 'responses',
downstreamPath,
maxRetries: MAX_RETRIES,
maxRetries,
clientContext,
downstreamApiKeyId,
});
const excludeChannelIds: number[] = [];
let retryCount = 0;
while (retryCount <= MAX_RETRIES) {
while (retryCount <= maxRetries) {
const selected = await selectSurfaceChannelForAttempt({
requestedModel,
downstreamPolicy,
@@ -690,6 +690,13 @@ describe('selectSurfaceChannelForAttempt', () => {
headers: { authorization: 'Bearer new-access-token' },
}), 'https://upstream.example.com/v1/responses');
expect(result).toEqual({
request: {
endpoint: 'responses',
path: '/v1/responses',
headers: { authorization: 'Bearer new-access-token' },
body: { model: 'gpt-5.2' },
},
targetUrl: 'https://upstream.example.com/v1/responses',
upstream: refreshedResponse,
upstreamPath: '/v1/responses',
});
@@ -222,6 +222,8 @@ export async function trySurfaceOauthRefreshRecovery<TRequest extends BuiltEndpo
}): Promise<{
upstream: Awaited<ReturnType<typeof dispatchRuntimeRequest>>;
upstreamPath: string;
request?: TRequest;
targetUrl?: string;
} | null> {
try {
const refreshed = await refreshOauthAccessTokenSingleflight(input.selected.account.id);
@@ -239,6 +241,8 @@ export async function trySurfaceOauthRefreshRecovery<TRequest extends BuiltEndpo
return {
upstream: refreshedResponse,
upstreamPath: refreshedRequest.path,
request: refreshedRequest,
targetUrl: refreshedTargetUrl,
};
}
@@ -244,6 +244,110 @@ describe('PUT /api/routes/:id route rebuild', () => {
]));
});
it('syncs explicit-group routing strategy to unique source routes', async () => {
await seedAccountWithToken('claude-opus-4-5');
await seedAccountWithToken('claude-sonnet-4-5');
const exactRouteA = await db.insert(schema.tokenRoutes).values({
modelPattern: 'claude-opus-4-5',
enabled: true,
routingStrategy: 'weighted',
}).returning().get();
const exactRouteB = await db.insert(schema.tokenRoutes).values({
modelPattern: 'claude-sonnet-4-5',
enabled: true,
routingStrategy: 'weighted',
}).returning().get();
const createResponse = await app.inject({
method: 'POST',
url: '/api/routes',
payload: {
routeMode: 'explicit_group',
displayName: 'claude-opus-4-6',
sourceRouteIds: [exactRouteA.id, exactRouteB.id],
routingStrategy: 'stable_first',
},
});
expect(createResponse.statusCode).toBe(200);
const refreshedRouteA = await db.select().from(schema.tokenRoutes)
.where(eq(schema.tokenRoutes.id, exactRouteA.id))
.get();
const refreshedRouteB = await db.select().from(schema.tokenRoutes)
.where(eq(schema.tokenRoutes.id, exactRouteB.id))
.get();
expect(refreshedRouteA?.routingStrategy).toBe('stable_first');
expect(refreshedRouteB?.routingStrategy).toBe('stable_first');
const groupRouteId = (createResponse.json() as { id: number }).id;
const updateResponse = await app.inject({
method: 'PUT',
url: `/api/routes/${groupRouteId}`,
payload: {
routingStrategy: 'round_robin',
},
});
expect(updateResponse.statusCode).toBe(200);
const updatedRouteA = await db.select().from(schema.tokenRoutes)
.where(eq(schema.tokenRoutes.id, exactRouteA.id))
.get();
const updatedRouteB = await db.select().from(schema.tokenRoutes)
.where(eq(schema.tokenRoutes.id, exactRouteB.id))
.get();
expect(updatedRouteA?.routingStrategy).toBe('round_robin');
expect(updatedRouteB?.routingStrategy).toBe('round_robin');
});
it('does not overwrite source routes shared by another explicit-group', async () => {
await seedAccountWithToken('claude-opus-4-5');
const sharedSourceRoute = await db.insert(schema.tokenRoutes).values({
modelPattern: 'claude-opus-4-5',
enabled: true,
routingStrategy: 'weighted',
}).returning().get();
const firstGroupResponse = await app.inject({
method: 'POST',
url: '/api/routes',
payload: {
routeMode: 'explicit_group',
displayName: 'claude-opus-4-6',
sourceRouteIds: [sharedSourceRoute.id],
routingStrategy: 'stable_first',
},
});
expect(firstGroupResponse.statusCode).toBe(200);
await db.update(schema.tokenRoutes).set({
routingStrategy: 'weighted',
}).where(eq(schema.tokenRoutes.id, sharedSourceRoute.id)).run();
const secondGroupResponse = await app.inject({
method: 'POST',
url: '/api/routes',
payload: {
routeMode: 'explicit_group',
displayName: 'claude-opus-4-6-alt',
sourceRouteIds: [sharedSourceRoute.id],
routingStrategy: 'round_robin',
},
});
expect(secondGroupResponse.statusCode).toBe(200);
const refreshedSharedRoute = await db.select().from(schema.tokenRoutes)
.where(eq(schema.tokenRoutes.id, sharedSourceRoute.id))
.get();
expect(refreshedSharedRoute?.routingStrategy).toBe('weighted');
});
it('fills missing sourceModel from source exact routes when loading explicit-group channels', async () => {
const sourceA = await seedAccountWithToken('deepseek-chat');
const sourceB = await seedAccountWithToken('deepseek-reasoner');
+101 -3
View File
@@ -6,7 +6,11 @@ import {
ACCOUNT_TOKEN_VALUE_STATUS_READY,
isUsableAccountToken,
} from '../../services/accountTokenService.js';
import { normalizeRouteRoutingStrategy } from '../../services/routeRoutingStrategy.js';
import {
DEFAULT_ROUTE_ROUTING_STRATEGY,
normalizeRouteRoutingStrategy,
type RouteRoutingStrategy,
} from '../../services/routeRoutingStrategy.js';
import { invalidateTokenRouterCache, matchesModelPattern, tokenRouter } from '../../services/tokenRouter.js';
import { startBackgroundTask } from '../../services/backgroundTaskService.js';
import {
@@ -133,6 +137,68 @@ async function replaceRouteSourceRouteIds(routeId: number, sourceRouteIds: numbe
).run();
}
async function syncExplicitGroupSourceRouteStrategies(input: {
groupRouteId: number;
sourceRouteIds: number[];
targetStrategy: RouteRoutingStrategy;
previousStrategy?: RouteRoutingStrategy | null;
}): Promise<number[]> {
const normalizedSourceRouteIds = Array.from(new Set(
input.sourceRouteIds.filter((routeId): routeId is number => Number.isFinite(routeId) && routeId > 0),
));
if (normalizedSourceRouteIds.length === 0) return [];
const [sourceRoutes, sourceGroupRows] = await Promise.all([
db.select().from(schema.tokenRoutes)
.where(inArray(schema.tokenRoutes.id, normalizedSourceRouteIds))
.all(),
db.select({
groupRouteId: schema.routeGroupSources.groupRouteId,
sourceRouteId: schema.routeGroupSources.sourceRouteId,
}).from(schema.routeGroupSources)
.where(inArray(schema.routeGroupSources.sourceRouteId, normalizedSourceRouteIds))
.all(),
]);
const otherGroupRefsBySourceRouteId = new Map<number, Set<number>>();
for (const row of sourceGroupRows) {
if (row.groupRouteId === input.groupRouteId) continue;
if (!otherGroupRefsBySourceRouteId.has(row.sourceRouteId)) {
otherGroupRefsBySourceRouteId.set(row.sourceRouteId, new Set());
}
otherGroupRefsBySourceRouteId.get(row.sourceRouteId)!.add(row.groupRouteId);
}
const previousStrategy = input.previousStrategy
? normalizeRouteRoutingStrategy(input.previousStrategy)
: null;
const updatableRouteIds: number[] = [];
for (const route of sourceRoutes) {
if (normalizeRouteMode(route.routeMode) === 'explicit_group') continue;
if (!isExactModelPattern(route.modelPattern)) continue;
if ((otherGroupRefsBySourceRouteId.get(route.id)?.size || 0) > 0) continue;
const currentStrategy = normalizeRouteRoutingStrategy(route.routingStrategy);
const shouldSync = (
currentStrategy === DEFAULT_ROUTE_ROUTING_STRATEGY
|| currentStrategy === input.targetStrategy
|| (previousStrategy !== null && currentStrategy === previousStrategy)
);
if (!shouldSync) continue;
if (currentStrategy === input.targetStrategy) continue;
updatableRouteIds.push(route.id);
}
if (updatableRouteIds.length === 0) return [];
await db.update(schema.tokenRoutes).set({
routingStrategy: input.targetStrategy,
updatedAt: new Date().toISOString(),
}).where(inArray(schema.tokenRoutes.id, updatableRouteIds)).run();
return updatableRouteIds;
}
async function clearDependentExplicitGroupSnapshotsBySourceRouteIds(sourceRouteIds: number[]): Promise<void> {
const normalizedSourceRouteIds = Array.from(new Set(
sourceRouteIds.filter((routeId): routeId is number => Number.isFinite(routeId) && routeId > 0),
@@ -894,6 +960,7 @@ export async function tokensRoutes(app: FastifyInstance) {
const routeMode = normalizeRouteMode(body.routeMode);
const displayName = typeof body.displayName === 'string' ? body.displayName.trim() : '';
const sourceRouteIds = normalizeSourceRouteIdsInput(body.sourceRouteIds);
const normalizedRoutingStrategy = normalizeRouteRoutingStrategy(body.routingStrategy);
const modelPattern = routeMode === 'explicit_group'
? displayName
: (typeof body.modelPattern === 'string' ? body.modelPattern.trim() : '');
@@ -916,7 +983,7 @@ export async function tokensRoutes(app: FastifyInstance) {
displayIcon: body.displayIcon,
routeMode,
modelMapping: body.modelMapping,
routingStrategy: normalizeRouteRoutingStrategy(body.routingStrategy),
routingStrategy: normalizedRoutingStrategy,
enabled: body.enabled ?? true,
}).run();
const routeId = Number(insertedRoute.lastInsertRowid || 0);
@@ -930,6 +997,15 @@ export async function tokensRoutes(app: FastifyInstance) {
if (routeMode === 'explicit_group') {
await replaceRouteSourceRouteIds(route.id, sourceRouteIds);
const syncedRouteIds = await syncExplicitGroupSourceRouteStrategies({
groupRouteId: route.id,
sourceRouteIds,
targetStrategy: normalizedRoutingStrategy,
});
if (syncedRouteIds.length > 0) {
await clearRouteDecisionSnapshots(syncedRouteIds);
await clearDependentExplicitGroupSnapshotsBySourceRouteIds(syncedRouteIds);
}
} else {
await populateRouteChannelsByModelPattern(route.id, modelPattern);
}
@@ -954,6 +1030,8 @@ export async function tokensRoutes(app: FastifyInstance) {
let nextModelPattern = existingRoute.modelPattern;
let nextDisplayName = existingRoute.displayName ?? '';
let nextSourceRouteIds = existingRoute.sourceRouteIds;
const previousRoutingStrategy = normalizeRouteRoutingStrategy(existingRoute.routingStrategy);
let nextRoutingStrategy = previousRoutingStrategy;
if (body.displayName !== undefined) {
nextDisplayName = String(body.displayName || '').trim();
@@ -978,7 +1056,10 @@ export async function tokensRoutes(app: FastifyInstance) {
updates.modelPattern = nextModelPattern;
}
if (body.modelMapping !== undefined) updates.modelMapping = body.modelMapping;
if (body.routingStrategy !== undefined) updates.routingStrategy = normalizeRouteRoutingStrategy(body.routingStrategy);
if (body.routingStrategy !== undefined) {
nextRoutingStrategy = normalizeRouteRoutingStrategy(body.routingStrategy);
updates.routingStrategy = nextRoutingStrategy;
}
if (body.enabled !== undefined) updates.enabled = body.enabled;
if (body.routeMode !== undefined) updates.routeMode = routeMode;
updates.updatedAt = new Date().toISOString();
@@ -987,6 +1068,19 @@ export async function tokensRoutes(app: FastifyInstance) {
if (routeMode === 'explicit_group' && body.sourceRouteIds !== undefined) {
await replaceRouteSourceRouteIds(id, nextSourceRouteIds);
}
const shouldSyncExplicitGroupSources = (
routeMode === 'explicit_group'
&& (body.routingStrategy !== undefined || body.sourceRouteIds !== undefined)
);
let syncedSourceRouteIds: number[] = [];
if (shouldSyncExplicitGroupSources) {
syncedSourceRouteIds = await syncExplicitGroupSourceRouteStrategies({
groupRouteId: id,
sourceRouteIds: nextSourceRouteIds,
targetStrategy: nextRoutingStrategy,
previousStrategy: previousRoutingStrategy,
});
}
const modelPatternChanged = nextModelPattern !== existingRoute.modelPattern;
const routeBehaviorChanged = modelPatternChanged
|| (routeMode === 'explicit_group' && body.sourceRouteIds !== undefined)
@@ -1000,6 +1094,10 @@ export async function tokensRoutes(app: FastifyInstance) {
await clearRouteDecisionSnapshot(id);
await clearDependentExplicitGroupSnapshotsBySourceRouteIds([id]);
}
if (syncedSourceRouteIds.length > 0) {
await clearRouteDecisionSnapshots(syncedSourceRouteIds);
await clearDependentExplicitGroupSnapshotsBySourceRouteIds(syncedSourceRouteIds);
}
invalidateTokenRouterCache();
return await getRouteWithSources(id);
});
+56 -41
View File
@@ -18,8 +18,7 @@ import { getProxyAuthContext } from '../../middleware/auth.js';
import { buildUpstreamUrl } from './upstreamUrl.js';
import { detectDownstreamClientContext, type DownstreamClientContext } from './downstreamClientContext.js';
import { insertProxyLog } from '../../services/proxyLogStore.js';
const MAX_RETRIES = 2;
import { canRetryProxyChannel, getProxyMaxChannelRetries } from '../../services/proxyChannelRetry.js';
export async function completionsProxyRoute(app: FastifyInstance) {
app.post('/v1/completions', async (request: FastifyRequest, reply: FastifyReply) => {
@@ -42,7 +41,7 @@ export async function completionsProxyRoute(app: FastifyInstance) {
const excludeChannelIds: number[] = [];
let retryCount = 0;
while (retryCount <= MAX_RETRIES) {
while (retryCount <= getProxyMaxChannelRetries()) {
let selected = retryCount === 0
? await tokenRouter.selectChannel(requestedModel, downstreamPolicy)
: await tokenRouter.selectNextChannel(requestedModel, excludeChannelIds, downstreamPolicy);
@@ -61,12 +60,13 @@ export async function completionsProxyRoute(app: FastifyInstance) {
error: { message: 'No available channels for this model', type: 'server_error' },
});
}
excludeChannelIds.push(selected.channel.id);
const targetUrl = buildUpstreamUrl(selected.site.url, '/v1/completions');
const forwardBody = { ...body, model: selected.actualModel };
const startTime = Date.now();
excludeChannelIds.push(selected.channel.id);
const targetUrl = buildUpstreamUrl(selected.site.url, '/v1/completions');
const upstreamModel = selected.actualModel || requestedModel;
const forwardBody = { ...body, model: upstreamModel };
const startTime = Date.now();
try {
const upstream = await fetch(targetUrl, withSiteRecordProxyRequestInit(selected.site, {
@@ -78,13 +78,13 @@ export async function completionsProxyRoute(app: FastifyInstance) {
body: JSON.stringify(forwardBody),
}, getProxyUrlFromExtraConfig(selected.account.extraConfig)));
if (!upstream.ok) {
const errText = await upstream.text().catch(() => 'unknown error');
tokenRouter.recordFailure(selected.channel.id, {
if (!upstream.ok) {
const errText = await upstream.text().catch(() => 'unknown error');
await recordTokenRouterEventBestEffort('record channel failure', () => tokenRouter.recordFailure(selected.channel.id, {
status: upstream.status,
errorText: errText,
modelName: selected.actualModel,
});
modelName: upstreamModel,
}));
logProxy(
selected,
requestedModel,
@@ -112,10 +112,10 @@ export async function completionsProxyRoute(app: FastifyInstance) {
});
}
if (shouldRetryProxyRequest(upstream.status, errText) && retryCount < MAX_RETRIES) {
retryCount++;
continue;
}
if (shouldRetryProxyRequest(upstream.status, errText) && canRetryProxyChannel(retryCount)) {
retryCount++;
continue;
}
await reportProxyAllFailed({
model: requestedModel,
@@ -199,7 +199,9 @@ export async function completionsProxyRoute(app: FastifyInstance) {
parsedUsage,
resolvedUsage,
});
tokenRouter.recordSuccess(selected.channel.id, latency, estimatedCost, selected.actualModel);
await recordTokenRouterEventBestEffort('record channel success', () => (
tokenRouter.recordSuccess(selected.channel.id, latency, estimatedCost, upstreamModel)
));
recordDownstreamCostUsage(request, estimatedCost);
logProxy(
selected,
@@ -230,14 +232,14 @@ export async function completionsProxyRoute(app: FastifyInstance) {
}
const latency = Date.now() - startTime;
const parsedUsage = parseProxyUsage(data);
const failure = detectProxyFailure({ rawText, usage: parsedUsage });
if (failure) {
const errText = failure.reason;
tokenRouter.recordFailure(selected.channel.id, {
const failure = detectProxyFailure({ rawText, usage: parsedUsage });
if (failure) {
const errText = failure.reason;
await recordTokenRouterEventBestEffort('record channel failure', () => tokenRouter.recordFailure(selected.channel.id, {
status: failure.status,
errorText: errText,
modelName: selected.actualModel,
});
modelName: upstreamModel,
}));
logProxy(
selected,
requestedModel,
@@ -256,10 +258,10 @@ export async function completionsProxyRoute(app: FastifyInstance) {
downstreamPath,
);
if (shouldRetryProxyRequest(failure.status, errText) && retryCount < MAX_RETRIES) {
retryCount += 1;
continue;
}
if (shouldRetryProxyRequest(failure.status, errText) && canRetryProxyChannel(retryCount)) {
retryCount += 1;
continue;
}
await reportProxyAllFailed({
model: requestedModel,
@@ -294,7 +296,9 @@ export async function completionsProxyRoute(app: FastifyInstance) {
resolvedUsage,
});
tokenRouter.recordSuccess(selected.channel.id, latency, estimatedCost, selected.actualModel);
await recordTokenRouterEventBestEffort('record channel success', () => (
tokenRouter.recordSuccess(selected.channel.id, latency, estimatedCost, upstreamModel)
));
recordDownstreamCostUsage(request, estimatedCost);
logProxy(
selected,
@@ -315,11 +319,11 @@ export async function completionsProxyRoute(app: FastifyInstance) {
);
return reply.send(data);
} catch (err: any) {
tokenRouter.recordFailure(selected.channel.id, {
await recordTokenRouterEventBestEffort('record channel failure', () => tokenRouter.recordFailure(selected.channel.id, {
status: 0,
errorText: err.message,
modelName: selected.actualModel,
});
modelName: upstreamModel,
}));
logProxy(
selected,
requestedModel,
@@ -337,10 +341,10 @@ export async function completionsProxyRoute(app: FastifyInstance) {
clientContext,
downstreamPath,
);
if (retryCount < MAX_RETRIES) {
retryCount++;
continue;
}
if (canRetryProxyChannel(retryCount)) {
retryCount++;
continue;
}
await reportProxyAllFailed({
model: requestedModel,
reason: err.message || 'network failure',
@@ -387,7 +391,7 @@ async function logProxy(
accountId: selected.account.id,
downstreamApiKeyId,
modelRequested,
modelActual: selected.actualModel,
modelActual: selected.actualModel || modelRequested,
status,
httpStatus,
latencyMs,
@@ -405,7 +409,18 @@ async function logProxy(
createdAt,
});
} catch (error) {
console.warn('[proxy/completions] failed to write proxy log', error);
}
}
console.warn('[proxy/completions] failed to write proxy log', error);
}
}
async function recordTokenRouterEventBestEffort(
label: string,
operation: () => Promise<unknown>,
): Promise<void> {
try {
await operation();
} catch (error) {
console.warn(`[proxy/completions] failed to ${label}`, error);
}
}
+122 -110
View File
@@ -14,11 +14,10 @@ import { composeProxyLogMessage } from './logPathMeta.js';
import { formatUtcSqlDateTime } from '../../services/localTimeService.js';
import { resolveProxyLogBilling } from './proxyBilling.js';
import { getProxyAuthContext } from '../../middleware/auth.js';
import { buildUpstreamUrl } from './upstreamUrl.js';
import { detectDownstreamClientContext, type DownstreamClientContext } from './downstreamClientContext.js';
import { insertProxyLog } from '../../services/proxyLogStore.js';
const MAX_RETRIES = 2;
import { buildUpstreamUrl } from './upstreamUrl.js';
import { detectDownstreamClientContext, type DownstreamClientContext } from './downstreamClientContext.js';
import { insertProxyLog } from '../../services/proxyLogStore.js';
import { canRetryProxyChannel, getProxyMaxChannelRetries } from '../../services/proxyChannelRetry.js';
export async function embeddingsProxyRoute(app: FastifyInstance) {
app.post('/v1/embeddings', async (request: FastifyRequest, reply: FastifyReply) => {
@@ -40,7 +39,7 @@ export async function embeddingsProxyRoute(app: FastifyInstance) {
const excludeChannelIds: number[] = [];
let retryCount = 0;
while (retryCount <= MAX_RETRIES) {
while (retryCount <= getProxyMaxChannelRetries()) {
let selected = retryCount === 0
? await tokenRouter.selectChannel(requestedModel, downstreamPolicy)
: await tokenRouter.selectNextChannel(requestedModel, excludeChannelIds, downstreamPolicy);
@@ -57,12 +56,12 @@ export async function embeddingsProxyRoute(app: FastifyInstance) {
});
return reply.code(503).send({ error: { message: 'No available channels', type: 'server_error' } });
}
excludeChannelIds.push(selected.channel.id);
const targetUrl = buildUpstreamUrl(selected.site.url, '/v1/embeddings');
const forwardBody = { ...body, model: selected.actualModel || requestedModel };
const startTime = Date.now();
excludeChannelIds.push(selected.channel.id);
const targetUrl = buildUpstreamUrl(selected.site.url, '/v1/embeddings');
const upstreamModel = selected.actualModel || requestedModel;
const forwardBody = { ...body, model: upstreamModel };
const startTime = Date.now();
try {
const upstream = await fetch(targetUrl, withSiteRecordProxyRequestInit(selected.site, {
@@ -74,30 +73,30 @@ export async function embeddingsProxyRoute(app: FastifyInstance) {
body: JSON.stringify(forwardBody),
}, getProxyUrlFromExtraConfig(selected.account.extraConfig)));
const text = await upstream.text();
if (!upstream.ok) {
tokenRouter.recordFailure(selected.channel.id, {
status: upstream.status,
errorText: text,
modelName: selected.actualModel,
});
logProxy(
selected,
requestedModel,
'failed',
upstream.status,
Date.now() - startTime,
text,
retryCount,
downstreamApiKeyId,
0,
0,
0,
0,
null,
clientContext,
downstreamPath,
);
const text = await upstream.text();
if (!upstream.ok) {
await recordTokenRouterEventBestEffort('record channel failure', () => tokenRouter.recordFailure(selected.channel.id, {
status: upstream.status,
errorText: text,
modelName: upstreamModel,
}));
logProxy(
selected,
requestedModel,
'failed',
upstream.status,
Date.now() - startTime,
text,
retryCount,
downstreamApiKeyId,
0,
0,
0,
0,
null,
clientContext,
downstreamPath,
);
if (isTokenExpiredError({ status: upstream.status, message: text })) {
await reportTokenExpired({
@@ -108,10 +107,10 @@ export async function embeddingsProxyRoute(app: FastifyInstance) {
});
}
if (shouldRetryProxyRequest(upstream.status, text) && retryCount < MAX_RETRIES) {
retryCount++;
continue;
}
if (shouldRetryProxyRequest(upstream.status, text) && canRetryProxyChannel(retryCount)) {
retryCount++;
continue;
}
await reportProxyAllFailed({
model: requestedModel,
@@ -120,67 +119,69 @@ export async function embeddingsProxyRoute(app: FastifyInstance) {
return reply.code(upstream.status).send({ error: { message: text, type: 'upstream_error' } });
}
let data: any = {};
try { data = JSON.parse(text); } catch { data = {}; }
const latency = Date.now() - startTime;
const parsedUsage = parseProxyUsage(data);
const resolvedUsage = await resolveProxyUsageWithSelfLogFallback({
site: selected.site,
account: selected.account,
tokenValue: selected.tokenValue,
tokenName: selected.tokenName,
modelName: selected.actualModel || requestedModel,
requestStartedAtMs: startTime,
requestEndedAtMs: startTime + latency,
localLatencyMs: latency,
usage: {
promptTokens: parsedUsage.promptTokens,
completionTokens: parsedUsage.completionTokens,
totalTokens: parsedUsage.totalTokens,
},
});
const { estimatedCost, billingDetails } = await resolveProxyLogBilling({
site: selected.site,
account: selected.account,
modelName: selected.actualModel || requestedModel,
parsedUsage,
resolvedUsage,
});
tokenRouter.recordSuccess(selected.channel.id, latency, estimatedCost, selected.actualModel);
recordDownstreamCostUsage(request, estimatedCost);
logProxy(
selected, requestedModel, 'success', upstream.status, latency, null, retryCount, downstreamApiKeyId,
resolvedUsage.promptTokens, resolvedUsage.completionTokens, resolvedUsage.totalTokens, estimatedCost, billingDetails, clientContext, downstreamPath,
);
return reply.code(upstream.status).send(data);
} catch (err: any) {
tokenRouter.recordFailure(selected.channel.id, {
status: 0,
errorText: err.message,
modelName: selected.actualModel,
});
logProxy(
selected,
requestedModel,
'failed',
0,
Date.now() - startTime,
err.message,
retryCount,
downstreamApiKeyId,
0,
0,
0,
0,
null,
clientContext,
downstreamPath,
);
if (retryCount < MAX_RETRIES) {
retryCount++;
continue;
}
let data: any = {};
try { data = JSON.parse(text); } catch { data = {}; }
const latency = Date.now() - startTime;
const parsedUsage = parseProxyUsage(data);
const resolvedUsage = await resolveProxyUsageWithSelfLogFallback({
site: selected.site,
account: selected.account,
tokenValue: selected.tokenValue,
tokenName: selected.tokenName,
modelName: selected.actualModel || requestedModel,
requestStartedAtMs: startTime,
requestEndedAtMs: startTime + latency,
localLatencyMs: latency,
usage: {
promptTokens: parsedUsage.promptTokens,
completionTokens: parsedUsage.completionTokens,
totalTokens: parsedUsage.totalTokens,
},
});
const { estimatedCost, billingDetails } = await resolveProxyLogBilling({
site: selected.site,
account: selected.account,
modelName: selected.actualModel || requestedModel,
parsedUsage,
resolvedUsage,
});
await recordTokenRouterEventBestEffort('record channel success', () => (
tokenRouter.recordSuccess(selected.channel.id, latency, estimatedCost, upstreamModel)
));
recordDownstreamCostUsage(request, estimatedCost);
logProxy(
selected, requestedModel, 'success', upstream.status, latency, null, retryCount, downstreamApiKeyId,
resolvedUsage.promptTokens, resolvedUsage.completionTokens, resolvedUsage.totalTokens, estimatedCost, billingDetails, clientContext, downstreamPath,
);
return reply.code(upstream.status).send(data);
} catch (err: any) {
await recordTokenRouterEventBestEffort('record channel failure', () => tokenRouter.recordFailure(selected.channel.id, {
status: 0,
errorText: err.message,
modelName: upstreamModel,
}));
logProxy(
selected,
requestedModel,
'failed',
0,
Date.now() - startTime,
err.message,
retryCount,
downstreamApiKeyId,
0,
0,
0,
0,
null,
clientContext,
downstreamPath,
);
if (canRetryProxyChannel(retryCount)) {
retryCount++;
continue;
}
await reportProxyAllFailed({
model: requestedModel,
reason: err.message || 'network failure',
@@ -221,11 +222,11 @@ async function logProxy(
});
await insertProxyLog({
routeId: selected.channel.routeId,
channelId: selected.channel.id,
accountId: selected.account.id,
downstreamApiKeyId,
modelRequested,
modelActual: selected.actualModel,
channelId: selected.channel.id,
accountId: selected.account.id,
downstreamApiKeyId,
modelRequested,
modelActual: selected.actualModel || modelRequested,
status,
httpStatus,
latencyMs,
@@ -242,8 +243,19 @@ async function logProxy(
retryCount,
createdAt,
});
} catch (error) {
console.warn('[proxy/embeddings] failed to write proxy log', error);
}
}
} catch (error) {
console.warn('[proxy/embeddings] failed to write proxy log', error);
}
}
async function recordTokenRouterEventBestEffort(
label: string,
operation: () => Promise<unknown>,
): Promise<void> {
try {
await operation();
} catch (error) {
console.warn(`[proxy/embeddings] failed to ${label}`, error);
}
}
@@ -220,6 +220,84 @@ describe('executeEndpointFlow', () => {
expect(fetchMock).toHaveBeenCalledTimes(1);
});
it('uses recovered request metadata for success callbacks', async () => {
fetchMock.mockResolvedValueOnce(toUndiciResponse(new Response(JSON.stringify({
error: { message: 'upstream_error', type: 'upstream_error' },
}), {
status: 400,
headers: { 'content-type': 'application/json' },
})));
const recovered = toUndiciResponse(new Response(JSON.stringify({ ok: 'recovered' }), {
status: 200,
headers: { 'content-type': 'application/json' },
}));
const onAttemptSuccess = vi.fn();
await executeEndpointFlow({
siteUrl: 'https://example.com',
endpointCandidates: ['responses'],
buildRequest: () => requestFor('/v1/responses'),
tryRecover: async () => ({
upstream: recovered,
upstreamPath: '/v1/messages',
request: { ...requestFor('/v1/messages'), endpoint: 'messages' },
}),
onAttemptSuccess,
});
expect(onAttemptSuccess).toHaveBeenCalledTimes(1);
expect(onAttemptSuccess.mock.calls[0]?.[0]?.request?.path).toBe('/v1/messages');
expect(onAttemptSuccess.mock.calls[0]?.[0]?.targetUrl).toBe('https://example.com/v1/messages');
});
it('does not let attempt hook failures change routing', async () => {
fetchMock
.mockResolvedValueOnce(toUndiciResponse(new Response(JSON.stringify({
error: { message: 'unsupported endpoint', type: 'invalid_request_error' },
}), {
status: 404,
headers: { 'content-type': 'application/json' },
})))
.mockResolvedValueOnce(toUndiciResponse(new Response(JSON.stringify({ ok: true }), {
status: 200,
headers: { 'content-type': 'application/json' },
})));
const result = await executeEndpointFlow({
siteUrl: 'https://example.com',
endpointCandidates: ['responses', 'chat'],
buildRequest: (endpoint) => endpoint === 'responses'
? requestFor('/v1/responses')
: { ...requestFor('/v1/chat/completions'), endpoint },
shouldDowngrade: () => true,
onAttemptFailure: async () => {
throw new Error('failure hook should be ignored');
},
onAttemptSuccess: async () => {
throw new Error('success hook should be ignored');
},
});
expect(result.ok).toBe(true);
expect(fetchMock).toHaveBeenCalledTimes(2);
});
it('uses proxyUrl for the default fetch path when no dispatch hook is provided', async () => {
fetchMock.mockResolvedValueOnce(toUndiciResponse(new Response(JSON.stringify({ ok: true }), {
status: 200,
headers: { 'content-type': 'application/json' },
})));
await executeEndpointFlow({
siteUrl: 'https://example.com',
proxyUrl: 'https://proxy.internal/base',
endpointCandidates: ['responses'],
buildRequest: () => requestFor('/v1/responses'),
});
expect(fetchMock.mock.calls[0]?.[0]).toBe('https://proxy.internal/base/v1/responses');
});
it('returns normalized final error when all endpoints fail', async () => {
fetchMock.mockResolvedValueOnce(toUndiciResponse(new Response(JSON.stringify({
error: { message: 'upstream_error', type: 'upstream_error' },
+41 -11
View File
@@ -39,6 +39,8 @@ export type EndpointAttemptSuccessContext = {
export type EndpointRecoverResult = {
upstream: Awaited<ReturnType<typeof fetch>>;
upstreamPath: string;
request?: BuiltEndpointRequest;
targetUrl?: string;
} | null;
export type EndpointFlowResult =
@@ -60,6 +62,7 @@ export function withUpstreamPath(path: string, message: string): string {
type ExecuteEndpointFlowInput = {
siteUrl: string;
proxyUrl?: string | null;
endpointCandidates: UpstreamEndpoint[];
buildRequest: (endpoint: UpstreamEndpoint, endpointIndex: number) => BuiltEndpointRequest;
dispatchRequest?: (
@@ -73,6 +76,24 @@ type ExecuteEndpointFlowInput = {
onAttemptSuccess?: (ctx: EndpointAttemptSuccessContext) => void | Promise<void>;
};
function buildAbsoluteUrl(base: string, path: string): string {
const normalizedBase = base.replace(/\/+$/, '');
const normalizedPath = path.replace(/^\/+/, '');
return normalizedPath ? `${normalizedBase}/${normalizedPath}` : normalizedBase;
}
async function runEndpointFlowHook<T>(
hook: ((ctx: T) => void | Promise<void>) | undefined,
ctx: T,
hookName: string,
): Promise<void> {
if (!hook) return;
try {
await hook(ctx);
} catch (error) {
console.error(`endpointFlow ${hookName} hook failed`, error);
}
}
export async function executeEndpointFlow(input: ExecuteEndpointFlowInput): Promise<EndpointFlowResult> {
const endpointCount = input.endpointCandidates.length;
if (endpointCount <= 0) {
@@ -90,7 +111,10 @@ export async function executeEndpointFlow(input: ExecuteEndpointFlowInput): Prom
for (let endpointIndex = 0; endpointIndex < endpointCount; endpointIndex += 1) {
const endpoint = input.endpointCandidates[endpointIndex] as UpstreamEndpoint;
const request = input.buildRequest(endpoint, endpointIndex);
const targetUrl = buildUpstreamUrl(input.siteUrl, request.path);
const defaultTarget = buildUpstreamUrl(input.siteUrl, request.path);
const targetUrl = input.proxyUrl
? buildAbsoluteUrl(input.proxyUrl, request.path)
: defaultTarget;
let response = input.dispatchRequest
? await input.dispatchRequest(request, targetUrl)
@@ -101,13 +125,13 @@ export async function executeEndpointFlow(input: ExecuteEndpointFlowInput): Prom
}));
if (response.ok) {
await input.onAttemptSuccess?.({
await runEndpointFlowHook(input.onAttemptSuccess, {
endpointIndex,
endpointCount,
request,
targetUrl,
response,
});
}, 'onAttemptSuccess');
return {
ok: true,
upstream: response,
@@ -128,13 +152,19 @@ export async function executeEndpointFlow(input: ExecuteEndpointFlowInput): Prom
if (input.tryRecover) {
const recovered = await input.tryRecover(baseContext);
if (recovered?.upstream?.ok) {
await input.onAttemptSuccess?.({
const recoveredRequest = recovered.request ?? baseContext.request;
const recoveredTargetUrl = recovered.targetUrl ?? (
input.proxyUrl
? buildAbsoluteUrl(input.proxyUrl, recovered.upstreamPath)
: buildUpstreamUrl(input.siteUrl, recovered.upstreamPath)
);
await runEndpointFlowHook(input.onAttemptSuccess, {
endpointIndex,
endpointCount,
request: baseContext.request,
targetUrl: baseContext.targetUrl,
request: recoveredRequest,
targetUrl: recoveredTargetUrl,
response: recovered.upstream,
});
}, 'onAttemptSuccess');
return {
ok: true,
upstream: recovered.upstream,
@@ -150,18 +180,18 @@ export async function executeEndpointFlow(input: ExecuteEndpointFlowInput): Prom
baseContext.request.path,
summarizeUpstreamError(response.status, rawErrText),
);
await input.onAttemptFailure?.({
await runEndpointFlowHook(input.onAttemptFailure, {
...baseContext,
errText,
});
}, 'onAttemptFailure');
const isLastEndpoint = endpointIndex >= endpointCount - 1;
const shouldDowngrade = !isLastEndpoint && !!input.shouldDowngrade?.(baseContext);
if (shouldDowngrade) {
await input.onDowngrade?.({
await runEndpointFlowHook(input.onDowngrade, {
...baseContext,
errText,
});
}, 'onDowngrade');
continue;
}
+108 -2
View File
@@ -1345,7 +1345,7 @@ describe('gemini native proxy routes', () => {
});
expect(response.statusCode).toBe(200);
expect(recordSuccessMock).toHaveBeenCalledWith(11, expect.any(Number), 0);
expect(recordSuccessMock).toHaveBeenCalledWith(11, expect.any(Number), 0, 'gemini-2.5-flash');
expect(dbInsertMock).toHaveBeenCalledTimes(1);
expect(dbInsertValuesMock).toHaveBeenCalledWith(expect.objectContaining({
routeId: 22,
@@ -1364,6 +1364,70 @@ describe('gemini native proxy routes', () => {
}));
});
it('keeps returning a successful Gemini response when channel success bookkeeping fails', async () => {
fetchMock.mockResolvedValue(new Response(JSON.stringify({
candidates: [
{
content: {
parts: [{ text: 'hello despite bookkeeping failure' }],
role: 'model',
},
finishReason: 'STOP',
},
],
usageMetadata: {
promptTokenCount: 10,
candidatesTokenCount: 5,
totalTokenCount: 15,
},
}), {
status: 200,
headers: { 'content-type': 'application/json' },
}));
recordSuccessMock.mockImplementation(async () => {
throw new Error('record success failed');
});
const response = await app.inject({
method: 'POST',
url: '/v1beta/models/gemini-2.5-flash:generateContent',
headers: {
'x-goog-api-key': 'sk-managed-gemini',
},
payload: {
contents: [
{
role: 'user',
parts: [{ text: 'hello' }],
},
],
},
});
expect(response.statusCode).toBe(200);
expect(selectNextChannelMock).not.toHaveBeenCalled();
expect(recordFailureMock).not.toHaveBeenCalled();
expect(response.json()).toEqual({
responseId: '',
modelVersion: '',
candidates: [
{
index: 0,
content: {
parts: [{ text: 'hello despite bookkeeping failure' }],
role: 'model',
},
finishReason: 'STOP',
},
],
usageMetadata: {
promptTokenCount: 10,
candidatesTokenCount: 5,
totalTokenCount: 15,
},
});
});
it('forwards explicit gemini version paths through transformer-owned parsing helpers', async () => {
fetchMock.mockResolvedValue(new Response(JSON.stringify({
candidates: [
@@ -1716,6 +1780,48 @@ describe('gemini native proxy routes', () => {
expect(response.body).toContain('data: [DONE]\r\n\r\n');
});
it('does not retry a Gemini SSE stream when channel success bookkeeping fails after bytes are written', async () => {
const encoder = new TextEncoder();
const upstreamBody = new ReadableStream<Uint8Array>({
start(controller) {
controller.enqueue(encoder.encode('data: {"candidates":[{"content":{"role":"model","parts":[{"text":"hello after bookkeeping failure"}]},"finishReason":"STOP"}],"usageMetadata":{"promptTokenCount":7,"candidatesTokenCount":4,"totalTokenCount":11}}\r\n\r\n'));
controller.enqueue(encoder.encode('data: [DONE]\r\n\r\n'));
controller.close();
},
});
fetchMock.mockResolvedValue(new Response(upstreamBody, {
status: 200,
headers: { 'content-type': 'text/event-stream; charset=utf-8' },
}));
recordSuccessMock.mockImplementation(async () => {
throw new Error('record success failed');
});
const response = await app.inject({
method: 'POST',
url: '/v1beta/models/gemini-2.5-flash:streamGenerateContent?alt=sse',
headers: {
'x-goog-api-key': 'sk-managed-gemini',
},
payload: {
contents: [
{
role: 'user',
parts: [{ text: 'hello' }],
},
],
},
});
expect(response.statusCode).toBe(200);
expect(response.headers['content-type']).toContain('text/event-stream');
expect(response.body).toContain('hello after bookkeeping failure');
expect(fetchMock).toHaveBeenCalledTimes(1);
expect(selectNextChannelMock).not.toHaveBeenCalled();
expect(recordFailureMock).not.toHaveBeenCalled();
});
it('falls back to the next channel when first Gemini channel returns 400 before any bytes are written', async () => {
selectNextChannelMock.mockReturnValue({
channel: { id: 12, routeId: 22 },
@@ -2015,6 +2121,6 @@ describe('gemini native proxy routes', () => {
totalTokens: 17,
errorMessage: '[downstream:/v1beta/models/gemini-2.5-flash:streamGenerateContent] [upstream:/v1beta/models/gemini-2.5-flash:streamGenerateContent]',
}));
expect(recordSuccessMock).toHaveBeenCalledWith(12, expect.any(Number), 0);
expect(recordSuccessMock).toHaveBeenCalledWith(12, expect.any(Number), 0, 'gemini-2.5-flash');
});
});
@@ -143,6 +143,64 @@ describe('/v1/images/edits route', () => {
expect(targetUrl).toBe('https://upstream.example.com/v1/images/edits');
});
it('returns an upstream error instead of fabricating an empty success when image generation JSON is malformed', async () => {
fetchMock.mockResolvedValue(new Response('not-json', {
status: 200,
headers: { 'content-type': 'application/json' },
}));
const response = await app.inject({
method: 'POST',
url: '/v1/images/generations',
headers: {
authorization: 'Bearer sk-demo',
},
payload: {
model: 'gpt-image-1',
prompt: 'draw a cat',
},
});
expect(response.statusCode).toBe(502);
expect(response.json()).toEqual({
error: {
message: 'not-json',
type: 'upstream_error',
},
});
expect(selectNextChannelMock).not.toHaveBeenCalled();
});
it('keeps returning a successful image edit response when post-success accounting fails', async () => {
fetchMock.mockResolvedValue(new Response(JSON.stringify({
created: 1,
data: [{ b64_json: 'iVBORw0KGgo=' }],
}), {
status: 200,
headers: { 'content-type': 'application/json' },
}));
estimateProxyCostMock.mockRejectedValueOnce(new Error('cost failed'));
const boundary = 'metapi-boundary-accounting';
const response = await app.inject({
method: 'POST',
url: '/v1/images/edits',
headers: {
authorization: 'Bearer sk-demo',
'content-type': `multipart/form-data; boundary=${boundary}`,
},
payload: buildMultipartBody(boundary),
});
expect(response.statusCode).toBe(200);
expect(response.json()).toEqual({
created: 1,
data: [{ b64_json: 'iVBORw0KGgo=' }],
});
expect(selectNextChannelMock).not.toHaveBeenCalled();
expect(recordFailureMock).not.toHaveBeenCalled();
});
it('returns explicit not-supported error for /v1/images/variations', async () => {
const response = await app.inject({
method: 'POST',
+465 -380
View File
@@ -1,383 +1,468 @@
import { FastifyInstance, FastifyReply, FastifyRequest } from 'fastify';
import { fetch } from 'undici';
import { tokenRouter } from '../../services/tokenRouter.js';
import { FastifyInstance, FastifyReply, FastifyRequest } from 'fastify';
import { fetch } from 'undici';
import { tokenRouter } from '../../services/tokenRouter.js';
import * as routeRefreshWorkflow from '../../services/routeRefreshWorkflow.js';
import { reportProxyAllFailed, reportTokenExpired } from '../../services/alertService.js';
import { isTokenExpiredError } from '../../services/alertRules.js';
import { estimateProxyCost } from '../../services/modelPricingService.js';
import { shouldRetryProxyRequest } from '../../services/proxyRetryPolicy.js';
import { ensureModelAllowedForDownstreamKey, getDownstreamRoutingPolicy, recordDownstreamCostUsage } from './downstreamPolicy.js';
import { withSiteRecordProxyRequestInit } from '../../services/siteProxy.js';
import { getProxyUrlFromExtraConfig } from '../../services/accountExtraConfig.js';
import { composeProxyLogMessage } from './logPathMeta.js';
import { formatUtcSqlDateTime } from '../../services/localTimeService.js';
import { cloneFormDataWithOverrides, ensureMultipartBufferParser, parseMultipartFormData } from './multipart.js';
import { getProxyAuthContext } from '../../middleware/auth.js';
import { buildUpstreamUrl } from './upstreamUrl.js';
import { detectDownstreamClientContext, type DownstreamClientContext } from './downstreamClientContext.js';
import { insertProxyLog } from '../../services/proxyLogStore.js';
const MAX_RETRIES = 2;
export async function imagesProxyRoute(app: FastifyInstance) {
ensureMultipartBufferParser(app);
app.post('/v1/images/generations', async (request: FastifyRequest, reply: FastifyReply) => {
const body = request.body as any;
const requestedModel = body?.model || 'gpt-image-1';
if (!await ensureModelAllowedForDownstreamKey(request, reply, requestedModel)) return;
const downstreamPolicy = getDownstreamRoutingPolicy(request);
const downstreamApiKeyId = getProxyAuthContext(request)?.keyId ?? null;
const downstreamPath = '/v1/images/generations';
const clientContext = detectDownstreamClientContext({
downstreamPath,
headers: request.headers as Record<string, unknown>,
body,
});
const excludeChannelIds: number[] = [];
let retryCount = 0;
while (retryCount <= MAX_RETRIES) {
let selected = retryCount === 0
? await tokenRouter.selectChannel(requestedModel, downstreamPolicy)
: await tokenRouter.selectNextChannel(requestedModel, excludeChannelIds, downstreamPolicy);
if (!selected && retryCount === 0) {
import { reportProxyAllFailed, reportTokenExpired } from '../../services/alertService.js';
import { isTokenExpiredError } from '../../services/alertRules.js';
import { estimateProxyCost } from '../../services/modelPricingService.js';
import { shouldRetryProxyRequest } from '../../services/proxyRetryPolicy.js';
import { ensureModelAllowedForDownstreamKey, getDownstreamRoutingPolicy, recordDownstreamCostUsage } from './downstreamPolicy.js';
import { withSiteRecordProxyRequestInit } from '../../services/siteProxy.js';
import { getProxyUrlFromExtraConfig } from '../../services/accountExtraConfig.js';
import { composeProxyLogMessage } from './logPathMeta.js';
import { formatUtcSqlDateTime } from '../../services/localTimeService.js';
import { cloneFormDataWithOverrides, ensureMultipartBufferParser, parseMultipartFormData } from './multipart.js';
import { getProxyAuthContext } from '../../middleware/auth.js';
import { buildUpstreamUrl } from './upstreamUrl.js';
import { detectDownstreamClientContext, type DownstreamClientContext } from './downstreamClientContext.js';
import { insertProxyLog } from '../../services/proxyLogStore.js';
import { canRetryProxyChannel, getProxyMaxChannelRetries } from '../../services/proxyChannelRetry.js';
export async function imagesProxyRoute(app: FastifyInstance) {
ensureMultipartBufferParser(app);
app.post('/v1/images/generations', async (request: FastifyRequest, reply: FastifyReply) => {
const body = request.body as any;
const requestedModel = body?.model || 'gpt-image-1';
if (!await ensureModelAllowedForDownstreamKey(request, reply, requestedModel)) return;
const downstreamPolicy = getDownstreamRoutingPolicy(request);
const downstreamApiKeyId = getProxyAuthContext(request)?.keyId ?? null;
const downstreamPath = '/v1/images/generations';
const clientContext = detectDownstreamClientContext({
downstreamPath,
headers: request.headers as Record<string, unknown>,
body,
});
const excludeChannelIds: number[] = [];
let retryCount = 0;
while (retryCount <= getProxyMaxChannelRetries()) {
let selected = retryCount === 0
? await tokenRouter.selectChannel(requestedModel, downstreamPolicy)
: await tokenRouter.selectNextChannel(requestedModel, excludeChannelIds, downstreamPolicy);
if (!selected && retryCount === 0) {
await routeRefreshWorkflow.refreshModelsAndRebuildRoutes();
selected = await tokenRouter.selectChannel(requestedModel, downstreamPolicy);
}
if (!selected) {
await reportProxyAllFailed({
model: requestedModel,
reason: 'No available channels after retries',
});
return reply.code(503).send({
error: { message: 'No available channels for this model', type: 'server_error' },
});
}
excludeChannelIds.push(selected.channel.id);
const targetUrl = buildUpstreamUrl(selected.site.url, '/v1/images/generations');
const forwardBody = { ...body, model: selected.actualModel };
const startTime = Date.now();
try {
const upstream = await fetch(targetUrl, withSiteRecordProxyRequestInit(selected.site, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Authorization': `Bearer ${selected.tokenValue}`,
},
body: JSON.stringify(forwardBody),
}, getProxyUrlFromExtraConfig(selected.account.extraConfig)));
const text = await upstream.text();
if (!upstream.ok) {
tokenRouter.recordFailure(selected.channel.id, {
status: upstream.status,
errorText: text,
modelName: selected.actualModel,
});
logProxy(
selected,
requestedModel,
'failed',
upstream.status,
Date.now() - startTime,
text,
retryCount,
downstreamApiKeyId,
0,
downstreamPath,
clientContext,
);
if (isTokenExpiredError({ status: upstream.status, message: text })) {
await reportTokenExpired({
accountId: selected.account.id,
username: selected.account.username,
siteName: selected.site.name,
detail: `HTTP ${upstream.status}`,
});
}
if (shouldRetryProxyRequest(upstream.status, text) && retryCount < MAX_RETRIES) {
retryCount++;
continue;
}
await reportProxyAllFailed({
model: requestedModel,
reason: `upstream returned HTTP ${upstream.status}`,
});
return reply.code(upstream.status).send({ error: { message: text, type: 'upstream_error' } });
}
let data: any = {};
try { data = JSON.parse(text); } catch { data = { data: [] }; }
const latency = Date.now() - startTime;
const estimatedCost = await estimateProxyCost({
site: selected.site,
account: selected.account,
modelName: selected.actualModel || requestedModel,
promptTokens: 0,
completionTokens: 0,
totalTokens: 0,
});
tokenRouter.recordSuccess(selected.channel.id, latency, estimatedCost, selected.actualModel);
recordDownstreamCostUsage(request, estimatedCost);
logProxy(selected, requestedModel, 'success', upstream.status, latency, null, retryCount, downstreamApiKeyId, estimatedCost, downstreamPath, clientContext);
return reply.code(upstream.status).send(data);
} catch (err: any) {
tokenRouter.recordFailure(selected.channel.id, {
status: 0,
errorText: err.message,
modelName: selected.actualModel,
});
logProxy(
selected,
requestedModel,
'failed',
0,
Date.now() - startTime,
err.message,
retryCount,
downstreamApiKeyId,
0,
downstreamPath,
clientContext,
);
if (retryCount < MAX_RETRIES) {
retryCount++;
continue;
}
await reportProxyAllFailed({
model: requestedModel,
reason: err.message || 'network failure',
});
return reply.code(502).send({
error: { message: `Upstream error: ${err.message}`, type: 'upstream_error' },
});
}
}
});
app.post('/v1/images/edits', async (request: FastifyRequest, reply: FastifyReply) => {
const multipartForm = await parseMultipartFormData(request);
const jsonBody = (!multipartForm && request.body && typeof request.body === 'object')
? request.body as Record<string, unknown>
: null;
const requestedModel = typeof multipartForm?.get('model') === 'string'
? String(multipartForm.get('model')).trim()
: (typeof jsonBody?.model === 'string' ? jsonBody.model.trim() : '') || 'gpt-image-1';
if (!await ensureModelAllowedForDownstreamKey(request, reply, requestedModel)) return;
const downstreamPolicy = getDownstreamRoutingPolicy(request);
const downstreamApiKeyId = getProxyAuthContext(request)?.keyId ?? null;
const downstreamPath = '/v1/images/edits';
const clientContext = detectDownstreamClientContext({
downstreamPath,
headers: request.headers as Record<string, unknown>,
body: jsonBody || Object.fromEntries(multipartForm?.entries?.() || []),
});
const excludeChannelIds: number[] = [];
let retryCount = 0;
while (retryCount <= MAX_RETRIES) {
let selected = retryCount === 0
? await tokenRouter.selectChannel(requestedModel, downstreamPolicy)
: await tokenRouter.selectNextChannel(requestedModel, excludeChannelIds, downstreamPolicy);
if (!selected && retryCount === 0) {
selected = await tokenRouter.selectChannel(requestedModel, downstreamPolicy);
}
if (!selected) {
await reportProxyAllFailed({
model: requestedModel,
reason: 'No available channels after retries',
});
return reply.code(503).send({
error: { message: 'No available channels for this model', type: 'server_error' },
});
}
excludeChannelIds.push(selected.channel.id);
const targetUrl = buildUpstreamUrl(selected.site.url, '/v1/images/generations');
const upstreamModel = selected.actualModel || requestedModel;
const forwardBody = { ...body, model: upstreamModel };
const startTime = Date.now();
try {
const upstream = await fetch(targetUrl, withSiteRecordProxyRequestInit(selected.site, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Authorization': `Bearer ${selected.tokenValue}`,
},
body: JSON.stringify(forwardBody),
}, getProxyUrlFromExtraConfig(selected.account.extraConfig)));
const text = await upstream.text();
if (!upstream.ok) {
await recordTokenRouterEventBestEffort('record channel failure', () => tokenRouter.recordFailure(selected.channel.id, {
status: upstream.status,
errorText: text,
modelName: upstreamModel,
}));
logProxy(
selected,
requestedModel,
'failed',
upstream.status,
Date.now() - startTime,
text,
retryCount,
downstreamApiKeyId,
0,
downstreamPath,
clientContext,
);
if (isTokenExpiredError({ status: upstream.status, message: text })) {
await reportTokenExpired({
accountId: selected.account.id,
username: selected.account.username,
siteName: selected.site.name,
detail: `HTTP ${upstream.status}`,
});
}
if (shouldRetryProxyRequest(upstream.status, text) && canRetryProxyChannel(retryCount)) {
retryCount++;
continue;
}
await reportProxyAllFailed({
model: requestedModel,
reason: `upstream returned HTTP ${upstream.status}`,
});
return reply.code(upstream.status).send({ error: { message: text, type: 'upstream_error' } });
}
const data = parseUpstreamImageResponse(text);
if (!data.ok) {
await recordTokenRouterEventBestEffort('record malformed upstream response', () => tokenRouter.recordFailure(selected.channel.id, {
status: 502,
errorText: data.message,
modelName: upstreamModel,
}));
logProxy(
selected,
requestedModel,
'failed',
502,
Date.now() - startTime,
data.message,
retryCount,
downstreamApiKeyId,
0,
downstreamPath,
clientContext,
);
await reportProxyAllFailed({
model: requestedModel,
reason: data.message,
});
return reply.code(502).send({
error: { message: data.message, type: 'upstream_error' },
});
}
const latency = Date.now() - startTime;
let estimatedCost = 0;
await recordTokenRouterEventBestEffort('estimate proxy cost', async () => {
estimatedCost = await estimateProxyCost({
site: selected.site,
account: selected.account,
modelName: upstreamModel,
promptTokens: 0,
completionTokens: 0,
totalTokens: 0,
});
});
await recordTokenRouterEventBestEffort('record channel success', () => (
tokenRouter.recordSuccess(selected.channel.id, latency, estimatedCost, upstreamModel)
));
await recordTokenRouterEventBestEffort('record downstream cost usage', () => (
recordDownstreamCostUsage(request, estimatedCost)
));
logProxy(selected, requestedModel, 'success', upstream.status, latency, null, retryCount, downstreamApiKeyId, estimatedCost, downstreamPath, clientContext);
return reply.code(upstream.status).send(data.value);
} catch (err: any) {
await recordTokenRouterEventBestEffort('record channel failure', () => tokenRouter.recordFailure(selected.channel.id, {
status: 0,
errorText: err.message,
modelName: upstreamModel,
}));
logProxy(
selected,
requestedModel,
'failed',
0,
Date.now() - startTime,
err.message,
retryCount,
downstreamApiKeyId,
0,
downstreamPath,
clientContext,
);
if (canRetryProxyChannel(retryCount)) {
retryCount++;
continue;
}
await reportProxyAllFailed({
model: requestedModel,
reason: err.message || 'network failure',
});
return reply.code(502).send({
error: { message: `Upstream error: ${err.message}`, type: 'upstream_error' },
});
}
}
});
app.post('/v1/images/edits', async (request: FastifyRequest, reply: FastifyReply) => {
const multipartForm = await parseMultipartFormData(request);
const jsonBody = (!multipartForm && request.body && typeof request.body === 'object')
? request.body as Record<string, unknown>
: null;
const requestedModel = typeof multipartForm?.get('model') === 'string'
? String(multipartForm.get('model')).trim()
: (typeof jsonBody?.model === 'string' ? jsonBody.model.trim() : '') || 'gpt-image-1';
if (!await ensureModelAllowedForDownstreamKey(request, reply, requestedModel)) return;
const downstreamPolicy = getDownstreamRoutingPolicy(request);
const downstreamApiKeyId = getProxyAuthContext(request)?.keyId ?? null;
const downstreamPath = '/v1/images/edits';
const clientContext = detectDownstreamClientContext({
downstreamPath,
headers: request.headers as Record<string, unknown>,
body: jsonBody || Object.fromEntries(multipartForm?.entries?.() || []),
});
const excludeChannelIds: number[] = [];
let retryCount = 0;
while (retryCount <= getProxyMaxChannelRetries()) {
let selected = retryCount === 0
? await tokenRouter.selectChannel(requestedModel, downstreamPolicy)
: await tokenRouter.selectNextChannel(requestedModel, excludeChannelIds, downstreamPolicy);
if (!selected && retryCount === 0) {
await routeRefreshWorkflow.refreshModelsAndRebuildRoutes();
selected = await tokenRouter.selectChannel(requestedModel, downstreamPolicy);
}
if (!selected) {
await reportProxyAllFailed({
model: requestedModel,
reason: 'No available channels after retries',
});
return reply.code(503).send({
error: { message: 'No available channels for this model', type: 'server_error' },
});
}
excludeChannelIds.push(selected.channel.id);
const targetUrl = buildUpstreamUrl(selected.site.url, '/v1/images/edits');
const startTime = Date.now();
try {
const requestInit = multipartForm
? withSiteRecordProxyRequestInit(selected.site, {
method: 'POST',
headers: {
Authorization: `Bearer ${selected.tokenValue}`,
},
body: cloneFormDataWithOverrides(multipartForm, {
model: selected.actualModel || requestedModel,
}) as any,
}, getProxyUrlFromExtraConfig(selected.account.extraConfig))
: withSiteRecordProxyRequestInit(selected.site, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
Authorization: `Bearer ${selected.tokenValue}`,
},
body: JSON.stringify({
...(jsonBody || {}),
model: selected.actualModel || requestedModel,
}),
}, getProxyUrlFromExtraConfig(selected.account.extraConfig));
const upstream = await fetch(targetUrl, requestInit);
const text = await upstream.text();
if (!upstream.ok) {
tokenRouter.recordFailure(selected.channel.id, {
status: upstream.status,
errorText: text,
modelName: selected.actualModel,
});
logProxy(
selected,
requestedModel,
'failed',
upstream.status,
Date.now() - startTime,
text,
retryCount,
downstreamApiKeyId,
0,
downstreamPath,
clientContext,
);
if (isTokenExpiredError({ status: upstream.status, message: text })) {
await reportTokenExpired({
accountId: selected.account.id,
username: selected.account.username,
siteName: selected.site.name,
detail: `HTTP ${upstream.status}`,
});
}
if (shouldRetryProxyRequest(upstream.status, text) && retryCount < MAX_RETRIES) {
retryCount++;
continue;
}
await reportProxyAllFailed({
model: requestedModel,
reason: `upstream returned HTTP ${upstream.status}`,
});
return reply.code(upstream.status).send({ error: { message: text, type: 'upstream_error' } });
}
let data: any = {};
try { data = JSON.parse(text); } catch { data = { data: [] }; }
const latency = Date.now() - startTime;
const estimatedCost = await estimateProxyCost({
site: selected.site,
account: selected.account,
modelName: selected.actualModel || requestedModel,
promptTokens: 0,
completionTokens: 0,
totalTokens: 0,
});
tokenRouter.recordSuccess(selected.channel.id, latency, estimatedCost, selected.actualModel);
recordDownstreamCostUsage(request, estimatedCost);
logProxy(selected, requestedModel, 'success', upstream.status, latency, null, retryCount, downstreamApiKeyId, estimatedCost, downstreamPath, clientContext);
return reply.code(upstream.status).send(data);
} catch (err: any) {
tokenRouter.recordFailure(selected.channel.id, {
status: 0,
errorText: err.message,
modelName: selected.actualModel,
});
logProxy(
selected,
requestedModel,
'failed',
0,
Date.now() - startTime,
err.message,
retryCount,
downstreamApiKeyId,
0,
downstreamPath,
clientContext,
);
if (retryCount < MAX_RETRIES) {
retryCount++;
continue;
}
await reportProxyAllFailed({
model: requestedModel,
reason: err.message || 'network failure',
});
return reply.code(502).send({
error: { message: `Upstream error: ${err.message}`, type: 'upstream_error' },
});
}
}
});
app.post('/v1/images/variations', async (_request: FastifyRequest, reply: FastifyReply) => {
return reply.code(400).send({
error: {
message: 'Image variations are not supported',
type: 'invalid_request_error',
},
});
});
}
async function logProxy(
selected: any,
modelRequested: string,
status: string,
httpStatus: number,
latencyMs: number,
errorMessage: string | null,
retryCount: number,
downstreamApiKeyId: number | null = null,
estimatedCost = 0,
downstreamPath = '/v1/images/generations',
clientContext: DownstreamClientContext | null = null,
) {
try {
const createdAt = formatUtcSqlDateTime(new Date());
const normalizedErrorMessage = composeProxyLogMessage({
clientKind: clientContext?.clientKind && clientContext.clientKind !== 'generic'
? clientContext.clientKind
: null,
sessionId: clientContext?.sessionId || null,
traceHint: clientContext?.traceHint || null,
downstreamPath,
errorMessage,
});
await insertProxyLog({
routeId: selected.channel.routeId,
channelId: selected.channel.id,
accountId: selected.account.id,
downstreamApiKeyId,
modelRequested,
modelActual: selected.actualModel,
status,
httpStatus,
latencyMs,
promptTokens: 0,
completionTokens: 0,
totalTokens: 0,
estimatedCost,
clientFamily: clientContext?.clientKind || null,
clientAppId: clientContext?.clientAppId || null,
clientAppName: clientContext?.clientAppName || null,
clientConfidence: clientContext?.clientConfidence || null,
errorMessage: normalizedErrorMessage,
retryCount,
createdAt,
});
} catch (error) {
console.warn('[proxy/images] failed to write proxy log', error);
}
}
selected = await tokenRouter.selectChannel(requestedModel, downstreamPolicy);
}
if (!selected) {
await reportProxyAllFailed({
model: requestedModel,
reason: 'No available channels after retries',
});
return reply.code(503).send({
error: { message: 'No available channels for this model', type: 'server_error' },
});
}
excludeChannelIds.push(selected.channel.id);
const targetUrl = buildUpstreamUrl(selected.site.url, '/v1/images/edits');
const upstreamModel = selected.actualModel || requestedModel;
const startTime = Date.now();
try {
const requestInit = multipartForm
? withSiteRecordProxyRequestInit(selected.site, {
method: 'POST',
headers: {
Authorization: `Bearer ${selected.tokenValue}`,
},
body: cloneFormDataWithOverrides(multipartForm, {
model: upstreamModel,
}) as any,
}, getProxyUrlFromExtraConfig(selected.account.extraConfig))
: withSiteRecordProxyRequestInit(selected.site, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
Authorization: `Bearer ${selected.tokenValue}`,
},
body: JSON.stringify({
...(jsonBody || {}),
model: upstreamModel,
}),
}, getProxyUrlFromExtraConfig(selected.account.extraConfig));
const upstream = await fetch(targetUrl, requestInit);
const text = await upstream.text();
if (!upstream.ok) {
await recordTokenRouterEventBestEffort('record channel failure', () => tokenRouter.recordFailure(selected.channel.id, {
status: upstream.status,
errorText: text,
modelName: upstreamModel,
}));
logProxy(
selected,
requestedModel,
'failed',
upstream.status,
Date.now() - startTime,
text,
retryCount,
downstreamApiKeyId,
0,
downstreamPath,
clientContext,
);
if (isTokenExpiredError({ status: upstream.status, message: text })) {
await reportTokenExpired({
accountId: selected.account.id,
username: selected.account.username,
siteName: selected.site.name,
detail: `HTTP ${upstream.status}`,
});
}
if (shouldRetryProxyRequest(upstream.status, text) && canRetryProxyChannel(retryCount)) {
retryCount++;
continue;
}
await reportProxyAllFailed({
model: requestedModel,
reason: `upstream returned HTTP ${upstream.status}`,
});
return reply.code(upstream.status).send({ error: { message: text, type: 'upstream_error' } });
}
const data = parseUpstreamImageResponse(text);
if (!data.ok) {
await recordTokenRouterEventBestEffort('record malformed upstream response', () => tokenRouter.recordFailure(selected.channel.id, {
status: 502,
errorText: data.message,
modelName: upstreamModel,
}));
logProxy(
selected,
requestedModel,
'failed',
502,
Date.now() - startTime,
data.message,
retryCount,
downstreamApiKeyId,
0,
downstreamPath,
clientContext,
);
await reportProxyAllFailed({
model: requestedModel,
reason: data.message,
});
return reply.code(502).send({
error: { message: data.message, type: 'upstream_error' },
});
}
const latency = Date.now() - startTime;
let estimatedCost = 0;
await recordTokenRouterEventBestEffort('estimate proxy cost', async () => {
estimatedCost = await estimateProxyCost({
site: selected.site,
account: selected.account,
modelName: upstreamModel,
promptTokens: 0,
completionTokens: 0,
totalTokens: 0,
});
});
await recordTokenRouterEventBestEffort('record channel success', () => (
tokenRouter.recordSuccess(selected.channel.id, latency, estimatedCost, upstreamModel)
));
await recordTokenRouterEventBestEffort('record downstream cost usage', () => (
recordDownstreamCostUsage(request, estimatedCost)
));
logProxy(selected, requestedModel, 'success', upstream.status, latency, null, retryCount, downstreamApiKeyId, estimatedCost, downstreamPath, clientContext);
return reply.code(upstream.status).send(data.value);
} catch (err: any) {
await recordTokenRouterEventBestEffort('record channel failure', () => tokenRouter.recordFailure(selected.channel.id, {
status: 0,
errorText: err.message,
modelName: upstreamModel,
}));
logProxy(
selected,
requestedModel,
'failed',
0,
Date.now() - startTime,
err.message,
retryCount,
downstreamApiKeyId,
0,
downstreamPath,
clientContext,
);
if (canRetryProxyChannel(retryCount)) {
retryCount++;
continue;
}
await reportProxyAllFailed({
model: requestedModel,
reason: err.message || 'network failure',
});
return reply.code(502).send({
error: { message: `Upstream error: ${err.message}`, type: 'upstream_error' },
});
}
}
});
app.post('/v1/images/variations', async (_request: FastifyRequest, reply: FastifyReply) => {
return reply.code(400).send({
error: {
message: 'Image variations are not supported',
type: 'invalid_request_error',
},
});
});
}
async function logProxy(
selected: any,
modelRequested: string,
status: string,
httpStatus: number,
latencyMs: number,
errorMessage: string | null,
retryCount: number,
downstreamApiKeyId: number | null = null,
estimatedCost = 0,
downstreamPath = '/v1/images/generations',
clientContext: DownstreamClientContext | null = null,
) {
try {
const createdAt = formatUtcSqlDateTime(new Date());
const normalizedErrorMessage = composeProxyLogMessage({
clientKind: clientContext?.clientKind && clientContext.clientKind !== 'generic'
? clientContext.clientKind
: null,
sessionId: clientContext?.sessionId || null,
traceHint: clientContext?.traceHint || null,
downstreamPath,
errorMessage,
});
await insertProxyLog({
routeId: selected.channel.routeId,
channelId: selected.channel.id,
accountId: selected.account.id,
downstreamApiKeyId,
modelRequested,
modelActual: selected.actualModel || modelRequested,
status,
httpStatus,
latencyMs,
promptTokens: 0,
completionTokens: 0,
totalTokens: 0,
estimatedCost,
clientFamily: clientContext?.clientKind || null,
clientAppId: clientContext?.clientAppId || null,
clientAppName: clientContext?.clientAppName || null,
clientConfidence: clientContext?.clientConfidence || null,
errorMessage: normalizedErrorMessage,
retryCount,
createdAt,
});
} catch (error) {
console.warn('[proxy/images] failed to write proxy log', error);
}
}
async function recordTokenRouterEventBestEffort(
label: string,
operation: () => Promise<unknown> | unknown,
): Promise<void> {
try {
await operation();
} catch (error) {
console.warn(`[proxy/images] failed to ${label}`, error);
}
}
function parseUpstreamImageResponse(text: string): { ok: true; value: any } | { ok: false; message: string } {
try {
return { ok: true, value: JSON.parse(text) };
} catch {
return { ok: false, message: text || 'Upstream returned malformed JSON' };
}
}
+30
View File
@@ -129,6 +129,36 @@ describe('/v1/search route', () => {
});
});
it('keeps returning a successful search response when channel success bookkeeping fails', async () => {
fetchMock.mockResolvedValue(new Response(JSON.stringify({
object: 'search.result',
data: [{ title: 'AxonHub' }],
}), {
status: 200,
headers: { 'content-type': 'application/json' },
}));
recordSuccessMock.mockRejectedValueOnce(new Error('record success failed'));
const response = await app.inject({
method: 'POST',
url: '/v1/search',
headers: {
authorization: 'Bearer sk-demo',
},
payload: {
query: 'axonhub',
},
});
expect(response.statusCode).toBe(200);
expect(response.json()).toEqual({
object: 'search.result',
data: [{ title: 'AxonHub' }],
});
expect(fetchMock).toHaveBeenCalledTimes(1);
expect(selectNextChannelMock).not.toHaveBeenCalled();
});
it('rejects max_results outside the allowed range', async () => {
const response = await app.inject({
method: 'POST',
+27 -14
View File
@@ -14,8 +14,7 @@ import { getProxyAuthContext } from '../../middleware/auth.js';
import { buildUpstreamUrl } from './upstreamUrl.js';
import { detectDownstreamClientContext, type DownstreamClientContext } from './downstreamClientContext.js';
import { insertProxyLog } from '../../services/proxyLogStore.js';
const MAX_RETRIES = 2;
import { canRetryProxyChannel, getProxyMaxChannelRetries } from '../../services/proxyChannelRetry.js';
const DEFAULT_SEARCH_MODEL = '__search';
const DEFAULT_MAX_RESULTS = 10;
const MAX_MAX_RESULTS = 20;
@@ -65,7 +64,7 @@ export async function searchProxyRoute(app: FastifyInstance) {
const excludeChannelIds: number[] = [];
let retryCount = 0;
while (retryCount <= MAX_RETRIES) {
while (retryCount <= getProxyMaxChannelRetries()) {
let selected = retryCount === 0
? await tokenRouter.selectChannel(requestedModel, downstreamPolicy)
: await tokenRouter.selectNextChannel(requestedModel, excludeChannelIds, downstreamPolicy);
@@ -87,10 +86,11 @@ export async function searchProxyRoute(app: FastifyInstance) {
excludeChannelIds.push(selected.channel.id);
const targetUrl = buildUpstreamUrl(selected.site.url, '/v1/search');
const upstreamModel = selected.actualModel || requestedModel;
const forwardBody = {
...body,
max_results: maxResults,
model: selected.actualModel || requestedModel,
model: upstreamModel,
};
const startTime = Date.now();
@@ -106,11 +106,11 @@ export async function searchProxyRoute(app: FastifyInstance) {
const text = await upstream.text();
if (!upstream.ok) {
tokenRouter.recordFailure(selected.channel.id, {
await recordTokenRouterEventBestEffort('record channel failure', () => tokenRouter.recordFailure(selected.channel.id, {
status: upstream.status,
errorText: text,
modelName: selected.actualModel,
});
modelName: upstreamModel,
}));
logProxy(
selected,
requestedModel,
@@ -131,7 +131,7 @@ export async function searchProxyRoute(app: FastifyInstance) {
detail: `HTTP ${upstream.status}`,
});
}
if (shouldRetryProxyRequest(upstream.status, text) && retryCount < MAX_RETRIES) {
if (shouldRetryProxyRequest(upstream.status, text) && canRetryProxyChannel(retryCount)) {
retryCount += 1;
continue;
}
@@ -146,16 +146,18 @@ export async function searchProxyRoute(app: FastifyInstance) {
try { data = JSON.parse(text); } catch { data = { data: [] }; }
const latency = Date.now() - startTime;
tokenRouter.recordSuccess(selected.channel.id, latency, 0, selected.actualModel);
await recordTokenRouterEventBestEffort('record channel success', () => (
tokenRouter.recordSuccess(selected.channel.id, latency, 0, upstreamModel)
));
recordDownstreamCostUsage(request, 0);
logProxy(selected, requestedModel, 'success', upstream.status, latency, null, retryCount, downstreamApiKeyId, clientContext, downstreamPath);
return reply.code(upstream.status).send(data);
} catch (error: any) {
tokenRouter.recordFailure(selected.channel.id, {
await recordTokenRouterEventBestEffort('record channel failure', () => tokenRouter.recordFailure(selected.channel.id, {
status: 0,
errorText: error?.message || 'network error',
modelName: selected.actualModel,
});
modelName: upstreamModel,
}));
logProxy(
selected,
requestedModel,
@@ -168,7 +170,7 @@ export async function searchProxyRoute(app: FastifyInstance) {
clientContext,
downstreamPath,
);
if (retryCount < MAX_RETRIES) {
if (canRetryProxyChannel(retryCount)) {
retryCount += 1;
continue;
}
@@ -204,7 +206,7 @@ async function logProxy(
accountId: selected.account.id,
downstreamApiKeyId,
modelRequested,
modelActual: selected.actualModel,
modelActual: selected.actualModel || modelRequested,
status,
httpStatus,
latencyMs,
@@ -232,3 +234,14 @@ async function logProxy(
console.warn('[proxy/search] failed to write proxy log', error);
}
}
async function recordTokenRouterEventBestEffort(
label: string,
operation: () => Promise<unknown>,
): Promise<void> {
try {
await operation();
} catch (error) {
console.warn(`[proxy/search] failed to ${label}`, error);
}
}
@@ -16,6 +16,9 @@ import {
recordUpstreamEndpointSuccess,
resetUpstreamEndpointRuntimeState,
resolveUpstreamEndpointCandidates,
boundEndpointRuntimeModelKey,
MAX_ENDPOINT_RUNTIME_MODEL_KEY_LENGTH,
MODEL_KEY_HASH_SUFFIX_LENGTH,
} from './upstreamEndpoint.js';
const baseContext = {
@@ -292,6 +295,50 @@ describe('resolveUpstreamEndpointCandidates', () => {
expect(order).toEqual(['responses', 'chat', 'messages']);
});
it('keeps learned endpoint state scoped to the model key', async () => {
recordUpstreamEndpointSuccess({
siteId: baseContext.site.id,
endpoint: 'responses',
downstreamFormat: 'openai',
modelName: 'gpt-5.3',
});
const learnedOrder = await resolveUpstreamEndpointCandidates(
{
...baseContext,
site: { ...baseContext.site, platform: 'new-api' },
},
'gpt-5.3',
'openai',
);
const unrelatedModelOrder = await resolveUpstreamEndpointCandidates(
{
...baseContext,
site: { ...baseContext.site, platform: 'new-api' },
},
'gpt-4.1',
'openai',
);
expect(learnedOrder).toEqual(['responses', 'chat', 'messages']);
expect(unrelatedModelOrder).toEqual(['chat', 'messages', 'responses']);
});
it('bounds runtime model keys before storing them', () => {
const longModelName = 'gpt-' + 'a'.repeat(MAX_ENDPOINT_RUNTIME_MODEL_KEY_LENGTH + 32);
const boundedKey = boundEndpointRuntimeModelKey(longModelName);
expect(boundedKey.length).toBeLessThanOrEqual(
MAX_ENDPOINT_RUNTIME_MODEL_KEY_LENGTH + 1 + MODEL_KEY_HASH_SUFFIX_LENGTH,
);
expect(boundedKey.startsWith(longModelName.slice(0, MAX_ENDPOINT_RUNTIME_MODEL_KEY_LENGTH))).toBe(true);
expect(boundedKey).toMatch(
new RegExp(`-[0-9a-f]{${MODEL_KEY_HASH_SUFFIX_LENGTH}}$`),
);
expect(boundEndpointRuntimeModelKey(longModelName)).toEqual(boundedKey);
});
it('keeps remote-document-url requests on a separate runtime preference bucket from inline document requests', async () => {
recordUpstreamEndpointSuccess({
siteId: baseContext.site.id,
+64 -3
View File
@@ -1,4 +1,4 @@
import { randomUUID } from 'node:crypto';
import { randomUUID, createHash } from 'node:crypto';
import {
rankConversationFileEndpoints,
type ConversationFileInputSummary,
@@ -45,6 +45,7 @@ export type UpstreamEndpoint = 'chat' | 'messages' | 'responses';
export type EndpointPreference = DownstreamFormat | 'responses';
type EndpointCapabilityProfile = {
modelKey: string;
preferMessagesForClaudeModel: boolean;
hasImageInput: boolean;
hasAudioInput: boolean;
@@ -56,6 +57,7 @@ type EndpointCapabilityProfile = {
type EndpointRuntimeState = {
preferredEndpoint: UpstreamEndpoint | null;
preferredUpdatedAtMs: number;
lastTouchedAtMs: number;
blockedUntilMsByEndpoint: Partial<Record<UpstreamEndpoint, number>>;
};
@@ -75,8 +77,24 @@ type ChannelContext = {
const ENDPOINT_RUNTIME_PREFERRED_TTL_MS = 24 * 60 * 60 * 1000;
const ENDPOINT_RUNTIME_BLOCK_TTL_MS = 6 * 60 * 60 * 1000;
const MAX_ENDPOINT_RUNTIME_STATES = 512;
export const MAX_ENDPOINT_RUNTIME_MODEL_KEY_LENGTH = 64;
export const MODEL_KEY_HASH_SUFFIX_LENGTH = 8;
const endpointRuntimeStates = new Map<string, EndpointRuntimeState>();
export function boundEndpointRuntimeModelKey(value: string): string {
if (value.length <= MAX_ENDPOINT_RUNTIME_MODEL_KEY_LENGTH) {
return value;
}
const prefix = value.slice(0, MAX_ENDPOINT_RUNTIME_MODEL_KEY_LENGTH);
const hash = createHash('sha256')
.update(value)
.digest('hex')
.slice(0, MODEL_KEY_HASH_SUFFIX_LENGTH);
return `${prefix}-${hash}`;
}
function isRecord(value: unknown): value is Record<string, unknown> {
return !!value && typeof value === 'object';
}
@@ -85,6 +103,14 @@ function asTrimmedString(value: unknown): string {
return typeof value === 'string' ? value.trim() : '';
}
function normalizeEndpointRuntimeModelKey(...values: Array<unknown>): string {
for (const value of values) {
const normalized = asTrimmedString(value).toLowerCase();
if (normalized) return boundEndpointRuntimeModelKey(normalized);
}
return boundEndpointRuntimeModelKey('unknown-model');
}
function resolveRequestedModelForPayloadRules(input: {
modelName: string;
openaiBody: Record<string, unknown>;
@@ -509,6 +535,7 @@ function buildEndpointCapabilityProfile(input?: {
}): EndpointCapabilityProfile {
const conversationFileSummary = input?.requestCapabilities?.conversationFileSummary;
return {
modelKey: normalizeEndpointRuntimeModelKey(input?.modelName, input?.requestedModelHint),
preferMessagesForClaudeModel: (
isClaudeFamilyModel(asTrimmedString(input?.modelName))
|| isClaudeFamilyModel(asTrimmedString(input?.requestedModelHint))
@@ -545,7 +572,7 @@ function buildEndpointRuntimeStateKey(input: {
return [
String(input.siteId),
input.downstreamFormat,
capabilityProfile.preferMessagesForClaudeModel ? 'claude' : 'generic',
capabilityProfile.modelKey,
capabilityProfile.hasNonImageFileInput ? 'files' : 'nofiles',
capabilityProfile.hasRemoteDocumentUrl ? 'remoteurl' : 'noremoteurl',
capabilityProfile.wantsNativeResponsesReasoning ? 'reasoning' : 'noreasoning',
@@ -553,15 +580,21 @@ function buildEndpointRuntimeStateKey(input: {
}
function getOrCreateEndpointRuntimeState(key: string, nowMs = Date.now()): EndpointRuntimeState {
sweepEndpointRuntimeStates(nowMs);
const existing = endpointRuntimeStates.get(key);
if (existing) return existing;
if (existing) {
existing.lastTouchedAtMs = nowMs;
return existing;
}
const initial: EndpointRuntimeState = {
preferredEndpoint: null,
preferredUpdatedAtMs: nowMs,
lastTouchedAtMs: nowMs,
blockedUntilMsByEndpoint: {},
};
endpointRuntimeStates.set(key, initial);
enforceEndpointRuntimeStateLimit();
return initial;
}
@@ -588,6 +621,7 @@ function applyEndpointRuntimePreference(
): UpstreamEndpoint[] {
const state = endpointRuntimeStates.get(key);
if (!state || candidates.length <= 1) return candidates;
state.lastTouchedAtMs = nowMs;
const blocked = new Set<UpstreamEndpoint>();
for (const endpoint of candidates) {
@@ -617,6 +651,33 @@ function applyEndpointRuntimePreference(
return next;
}
function sweepEndpointRuntimeStates(nowMs = Date.now()): void {
for (const [key, state] of endpointRuntimeStates.entries()) {
const hasActiveBlock = Object.values(state.blockedUntilMsByEndpoint).some((untilMs) => (
typeof untilMs === 'number' && untilMs > nowMs
));
const preferredFresh = (
!!state.preferredEndpoint
&& (state.preferredUpdatedAtMs + ENDPOINT_RUNTIME_PREFERRED_TTL_MS) > nowMs
);
const recentlyTouched = (state.lastTouchedAtMs + ENDPOINT_RUNTIME_PREFERRED_TTL_MS) > nowMs;
if (!hasActiveBlock && !preferredFresh && !recentlyTouched) {
endpointRuntimeStates.delete(key);
}
}
}
function enforceEndpointRuntimeStateLimit(): void {
if (endpointRuntimeStates.size <= MAX_ENDPOINT_RUNTIME_STATES) return;
const entries = [...endpointRuntimeStates.entries()]
.sort((left, right) => left[1].lastTouchedAtMs - right[1].lastTouchedAtMs);
const overflowCount = endpointRuntimeStates.size - MAX_ENDPOINT_RUNTIME_STATES;
for (const [key] of entries.slice(0, overflowCount)) {
endpointRuntimeStates.delete(key);
}
}
function inferSuggestedEndpointFromError(errorText?: string | null): UpstreamEndpoint | null {
const text = (errorText || '').toLowerCase();
if (!text) return null;
+33 -12
View File
@@ -17,8 +17,7 @@ import {
refreshProxyVideoTaskSnapshot,
saveProxyVideoTask,
} from '../../services/proxyVideoTaskStore.js';
const MAX_RETRIES = 2;
import { canRetryProxyChannel, getProxyMaxChannelRetries } from '../../services/proxyChannelRetry.js';
function rewriteVideoResponsePublicId(payload: unknown, publicId: string): unknown {
if (!payload || typeof payload !== 'object') return payload;
@@ -51,7 +50,7 @@ export async function videosProxyRoute(app: FastifyInstance) {
const excludeChannelIds: number[] = [];
let retryCount = 0;
while (retryCount <= MAX_RETRIES) {
while (retryCount <= getProxyMaxChannelRetries()) {
let selected = retryCount === 0
? await tokenRouter.selectChannel(requestedModel, downstreamPolicy)
: await tokenRouter.selectNextChannel(requestedModel, excludeChannelIds, downstreamPolicy);
@@ -73,6 +72,7 @@ export async function videosProxyRoute(app: FastifyInstance) {
excludeChannelIds.push(selected.channel.id);
const targetUrl = buildUpstreamUrl(selected.site.url, '/v1/videos');
const upstreamModel = selected.actualModel || requestedModel;
const startTime = Date.now();
try {
@@ -84,7 +84,7 @@ export async function videosProxyRoute(app: FastifyInstance) {
Authorization: `Bearer ${selected.tokenValue}`,
},
body: cloneFormDataWithOverrides(multipartForm, {
model: selected.actualModel || requestedModel,
model: upstreamModel,
}) as any,
}, accountProxy)
: withSiteRecordProxyRequestInit(selected.site, {
@@ -95,14 +95,18 @@ export async function videosProxyRoute(app: FastifyInstance) {
},
body: JSON.stringify({
...(jsonBody || {}),
model: selected.actualModel || requestedModel,
model: upstreamModel,
}),
}, accountProxy);
const upstream = await fetch(targetUrl, requestInit);
const text = await upstream.text();
if (!upstream.ok) {
tokenRouter.recordFailure(selected.channel.id, selected.actualModel);
await recordTokenRouterEventBestEffort('record channel failure', () => tokenRouter.recordFailure(selected.channel.id, {
status: upstream.status,
errorText: text,
modelName: upstreamModel,
}));
if (isTokenExpiredError({ status: upstream.status, message: text })) {
await reportTokenExpired({
accountId: selected.account.id,
@@ -111,7 +115,7 @@ export async function videosProxyRoute(app: FastifyInstance) {
detail: `HTTP ${upstream.status}`,
});
}
if (shouldRetryProxyRequest(upstream.status, text) && retryCount < MAX_RETRIES) {
if (shouldRetryProxyRequest(upstream.status, text) && canRetryProxyChannel(retryCount)) {
retryCount += 1;
continue;
}
@@ -136,7 +140,7 @@ export async function videosProxyRoute(app: FastifyInstance) {
siteUrl: selected.site.url,
tokenValue: selected.tokenValue,
requestedModel,
actualModel: selected.actualModel || requestedModel,
actualModel: upstreamModel,
channelId: typeof selected.channel.id === 'number' ? selected.channel.id : null,
accountId: typeof selected.account.id === 'number' ? selected.account.id : null,
statusSnapshot: data,
@@ -150,17 +154,23 @@ export async function videosProxyRoute(app: FastifyInstance) {
const estimatedCost = await estimateProxyCost({
site: selected.site,
account: selected.account,
modelName: selected.actualModel || requestedModel,
modelName: upstreamModel,
promptTokens: 0,
completionTokens: 0,
totalTokens: 0,
});
tokenRouter.recordSuccess(selected.channel.id, latency, estimatedCost, selected.actualModel);
await recordTokenRouterEventBestEffort('record channel success', () => (
tokenRouter.recordSuccess(selected.channel.id, latency, estimatedCost, upstreamModel)
));
recordDownstreamCostUsage(request, estimatedCost);
return reply.code(upstream.status).send(rewriteVideoResponsePublicId(data, mapping.publicId));
} catch (error: any) {
tokenRouter.recordFailure(selected.channel.id, selected.actualModel);
if (retryCount < MAX_RETRIES) {
await recordTokenRouterEventBestEffort('record channel failure', () => tokenRouter.recordFailure(selected.channel.id, {
status: 0,
errorText: error?.message || 'network failure',
modelName: upstreamModel,
}));
if (canRetryProxyChannel(retryCount)) {
retryCount += 1;
continue;
}
@@ -232,3 +242,14 @@ export async function videosProxyRoute(app: FastifyInstance) {
});
});
}
async function recordTokenRouterEventBestEffort(
label: string,
operation: () => Promise<unknown>,
): Promise<void> {
try {
await operation();
} catch (error) {
console.warn(`[proxy/videos] failed to ${label}`, error);
}
}
@@ -99,4 +99,42 @@ describe('Sub2ApiAdapter subscription summary parsing', () => {
],
});
});
it('preserves explicit zero activeCount from summary payload', async () => {
const adapter = new MockSub2ApiAdapter({
'/api/v1/auth/me': {
code: 0,
message: 'success',
data: { id: 1, username: 'demo', email: 'demo@example.com', balance: 8.5 },
},
'/api/v1/subscriptions/summary': {
code: 0,
message: 'success',
data: {
active_count: 0,
total_used_usd: 0,
subscriptions: [
{
id: 11,
group_name: 'Expired',
status: 'expired',
},
],
},
},
});
const balance = await adapter.getBalance('https://sub2api.example.com', 'jwt-token');
expect(balance.subscriptionSummary).toEqual({
activeCount: 0,
totalUsedUsd: 0,
subscriptions: [
{
id: 11,
groupName: 'Expired',
status: 'expired',
},
],
});
});
});
+10 -1
View File
@@ -37,6 +37,15 @@ export class Sub2ApiAdapter extends BasePlatformAdapter {
return undefined;
}
private parseNonNegativeInteger(raw: unknown): number | undefined {
if (typeof raw === 'number' && Number.isFinite(raw) && raw >= 0) return Math.trunc(raw);
if (typeof raw === 'string') {
const parsed = Number.parseInt(raw.trim(), 10);
if (!Number.isNaN(parsed) && parsed >= 0) return parsed;
}
return undefined;
}
private parseNonNegativeNumber(raw: unknown): number | undefined {
if (typeof raw === 'number' && Number.isFinite(raw) && raw >= 0) {
return this.roundCurrency(raw);
@@ -178,7 +187,7 @@ export class Sub2ApiAdapter extends BasePlatformAdapter {
? payload as Record<string, unknown>
: {};
const subscriptions = this.parseSubscriptionItems(payload);
const activeCount = this.parsePositiveInteger(body.active_count ?? body.activeCount);
const activeCount = this.parseNonNegativeInteger(body.active_count ?? body.activeCount);
const totalUsedUsd = this.parseNonNegativeNumber(body.total_used_usd ?? body.totalUsedUsd);
const inferredUsedUsd = subscriptions.reduce((sum, item) => sum + (item.monthlyUsedUsd || 0), 0);
@@ -0,0 +1,37 @@
import { afterEach, describe, expect, it } from 'vitest';
import { buildConfig, config } from '../config.js';
import {
canRetryProxyChannel,
getProxyMaxChannelAttempts,
getProxyMaxChannelRetries,
} from './proxyChannelRetry.js';
const originalProxyMaxChannelAttempts = config.proxyMaxChannelAttempts;
afterEach(() => {
config.proxyMaxChannelAttempts = originalProxyMaxChannelAttempts;
});
describe('proxyChannelRetry', () => {
it('parses proxy max channel attempts from config with a safer default', () => {
expect(buildConfig({} as NodeJS.ProcessEnv).proxyMaxChannelAttempts).toBe(3);
expect(buildConfig({ PROXY_MAX_CHANNEL_ATTEMPTS: '3' } as NodeJS.ProcessEnv).proxyMaxChannelAttempts).toBe(3);
});
it('derives retry budget from total channel attempts', () => {
config.proxyMaxChannelAttempts = 5;
expect(getProxyMaxChannelAttempts()).toBe(5);
expect(getProxyMaxChannelRetries()).toBe(4);
expect(canRetryProxyChannel(3)).toBe(true);
expect(canRetryProxyChannel(4)).toBe(false);
});
it('clamps invalid runtime config to at least one channel attempt', () => {
config.proxyMaxChannelAttempts = 0;
expect(getProxyMaxChannelAttempts()).toBe(1);
expect(getProxyMaxChannelRetries()).toBe(0);
expect(canRetryProxyChannel(0)).toBe(false);
});
});
+14
View File
@@ -0,0 +1,14 @@
import { config } from '../config.js';
export function getProxyMaxChannelAttempts(): number {
const attempts = Math.trunc(config.proxyMaxChannelAttempts || 0);
return attempts > 0 ? attempts : 1;
}
export function getProxyMaxChannelRetries(): number {
return Math.max(0, getProxyMaxChannelAttempts() - 1);
}
export function canRetryProxyChannel(retryCount: number): boolean {
return retryCount < getProxyMaxChannelRetries();
}
@@ -43,4 +43,13 @@ describe('proxyRetryPolicy', () => {
shouldRetryProxyRequest(400, 'Unsupported legacy protocol: /v1/chat/completions is not supported. Please use /v1/responses.'),
).toBe(true);
});
it('does not retry client-side timeout validation errors', () => {
expect(
shouldRetryProxyRequest(400, '{"error":{"message":"timeout must be <= 60"}}'),
).toBe(false);
expect(
shouldRetryProxyRequest(400, '{"error":{"message":"invalid timeout parameter"}}'),
).toBe(false);
});
});
+5 -1
View File
@@ -14,6 +14,10 @@ const MODEL_UNSUPPORTED_PATTERNS: RegExp[] = [
/you\s+do\s+not\s+have\s+access\s+to\s+the\s+model/i,
];
export const RETRYABLE_TIMEOUT_PATTERNS: RegExp[] = [
/(request timed out|connection timed out|read timeout|\btimed out\b)/i,
];
const RETRYABLE_CHANNEL_LOCAL_PATTERNS: RegExp[] = [
/unsupported\s+legacy\s+protocol/i,
/please\s+use\s+\/v1\/responses/i,
@@ -34,7 +38,7 @@ const RETRYABLE_CHANNEL_LOCAL_PATTERNS: RegExp[] = [
/gateway\s+time-?out/i,
/service\s+unavailable/i,
/cpu\s+overloaded/i,
/timeout/i,
...RETRYABLE_TIMEOUT_PATTERNS,
];
const NON_RETRYABLE_REQUEST_PATTERNS: RegExp[] = [
@@ -546,6 +546,67 @@ describe('TokenRouter selection scoring', () => {
expect((recoveredCandidateB?.probability || 0)).toBeLessThan(70);
});
it('does not open a site breaker for repeated timeout validation errors', async () => {
config.routingWeights = {
baseWeightFactor: 1,
valueScoreFactor: 0,
costWeight: 0,
balanceWeight: 0,
usageWeight: 0,
};
const route = await createRoute('gpt-5.4');
const siteA = await createSite('timeout-validation-a');
const accountA = await createAccount(siteA.id, 'timeout-validation-user-a');
const tokenA = await createToken(accountA.id, 'timeout-validation-token-a');
const channelA = await db.insert(schema.routeChannels).values({
routeId: route.id,
accountId: accountA.id,
tokenId: tokenA.id,
priority: 0,
weight: 10,
enabled: true,
}).returning().get();
const siteB = await createSite('timeout-validation-b');
const accountB = await createAccount(siteB.id, 'timeout-validation-user-b');
const tokenB = await createToken(accountB.id, 'timeout-validation-token-b');
const channelB = await db.insert(schema.routeChannels).values({
routeId: route.id,
accountId: accountB.id,
tokenId: tokenB.id,
priority: 0,
weight: 10,
enabled: true,
}).returning().get();
const router = new TokenRouter();
for (let index = 0; index < 3; index += 1) {
await router.recordFailure(channelA.id, {
status: 400,
errorText: 'invalid timeout parameter',
});
}
await db.update(schema.routeChannels).set({
cooldownUntil: null,
lastFailAt: null,
failCount: 0,
}).where(eq(schema.routeChannels.id, channelA.id)).run();
invalidateTokenRouterCache();
const decision = await router.explainSelection('gpt-5.4');
const candidateA = decision.candidates.find((candidate) => candidate.channelId === channelA.id);
const candidateB = decision.candidates.find((candidate) => candidate.channelId === channelB.id);
expect(candidateA).toBeTruthy();
expect(candidateB).toBeTruthy();
expect(candidateA?.reason || '').not.toContain('站点熔断');
expect(candidateB?.reason || '').not.toContain('站点熔断');
expect(decision.summary.join(' ')).not.toContain('站点熔断避让');
expect((candidateA?.probability || 0)).toBeGreaterThan(0);
});
it('uses persisted site success and latency history to prefer historically healthier sites', async () => {
config.routingWeights = {
baseWeightFactor: 1,
+9 -4
View File
@@ -3,6 +3,7 @@ import { db, schema } from '../db/index.js';
import { upsertSetting } from '../db/upsertSetting.js';
import { config } from '../config.js';
import { getCachedModelRoutingReferenceCost, refreshModelPricingCatalog } from './modelPricingService.js';
import { RETRYABLE_TIMEOUT_PATTERNS } from './proxyRetryPolicy.js';
import {
normalizeRouteRoutingStrategy,
type RouteRoutingStrategy,
@@ -133,8 +134,7 @@ const SITE_VALIDATION_FAILURE_PATTERNS: RegExp[] = [
const SITE_TRANSIENT_FAILURE_PATTERNS: RegExp[] = [
/bad\s+gateway/i,
/gateway\s+time-?out/i,
/timed?\s*out/i,
/timeout/i,
...RETRYABLE_TIMEOUT_PATTERNS,
/service\s+unavailable/i,
/temporar(?:y|ily)\s+unavailable/i,
/cpu\s+overloaded/i,
@@ -505,7 +505,9 @@ function scheduleSiteRuntimeHealthPersistence(): void {
if (siteRuntimeHealthSaveTimer) return;
siteRuntimeHealthSaveTimer = setTimeout(() => {
siteRuntimeHealthSaveTimer = null;
void persistSiteRuntimeHealthState();
void persistSiteRuntimeHealthState().catch((error) => {
console.error('Failed to persist site runtime health state', error);
});
}, SITE_RUNTIME_HEALTH_PERSIST_DEBOUNCE_MS);
}
@@ -560,8 +562,11 @@ async function ensureSiteRuntimeHealthStateLoaded(): Promise<void> {
siteRuntimeHealthLoadPromise = (async () => {
try {
await loadSiteRuntimeHealthStateFromSettings();
} finally {
siteRuntimeHealthLoaded = true;
} catch (error) {
console.warn('Failed to restore site runtime health state from settings', error);
siteRuntimeHealthLoadPromise = null;
siteRuntimeHealthLoaded = false;
}
})();
}
@@ -320,6 +320,123 @@ describe('sanitizeResponsesBodyForProxy', () => {
});
expect(result.max_completion_tokens).toBeUndefined();
});
it('normalizes failed input item statuses before proxying Responses requests', () => {
const result = sanitizeResponsesBodyForProxy(
{
model: 'gpt-5',
input: [
{
role: 'assistant',
status: 'failed',
content: [
{
type: 'output_text',
text: 'tool step failed',
},
],
},
{
type: 'function_call',
call_id: 'call_1',
name: 'lookup_weather',
arguments: '{"city":"Shanghai"}',
status: 'failed',
},
{
type: 'function_call_output',
call_id: 'call_1',
output: '{"error":"timeout"}',
status: 'completed',
},
],
},
'gpt-5',
false,
);
expect(result.input).toEqual([
{
type: 'message',
role: 'assistant',
status: 'incomplete',
content: [
{
type: 'output_text',
text: 'tool step failed',
},
],
},
{
type: 'function_call',
call_id: 'call_1',
name: 'lookup_weather',
arguments: '{"city":"Shanghai"}',
status: 'incomplete',
},
{
type: 'function_call_output',
call_id: 'call_1',
output: '{"error":"timeout"}',
status: 'completed',
},
]);
});
it('drops unsupported input item statuses before proxying Responses requests', () => {
const result = sanitizeResponsesBodyForProxy(
{
model: 'gpt-5',
input: [
{
type: 'message',
role: 'user',
status: 'errored',
content: 'hello',
},
{
type: 'function_call',
call_id: 'call_2',
name: 'lookup_weather',
arguments: '{}',
status: 'invalid',
},
{
type: 'reasoning',
id: 'rs_1',
status: 'broken',
summary: [],
},
],
},
'gpt-5',
false,
);
expect(result.input).toEqual([
{
type: 'message',
role: 'user',
content: [
{
type: 'input_text',
text: 'hello',
},
],
},
{
type: 'function_call',
call_id: 'call_2',
name: 'lookup_weather',
arguments: '{}',
},
{
type: 'reasoning',
id: 'rs_1',
summary: [],
},
]);
});
});
describe('convertOpenAiBodyToResponsesBody', () => {
@@ -8,6 +8,36 @@ function asTrimmedString(value: unknown): string {
return typeof value === 'string' ? value.trim() : '';
}
const ALLOWED_RESPONSES_INPUT_STATUSES = new Set([
'in_progress',
'completed',
'incomplete',
]);
function normalizeResponsesInputStatus(value: unknown): string | undefined {
const normalized = asTrimmedString(value).toLowerCase();
if (!normalized) return undefined;
if (normalized === 'failed') return 'incomplete';
return ALLOWED_RESPONSES_INPUT_STATUSES.has(normalized) ? normalized : undefined;
}
function withNormalizedResponsesInputStatus(item: Record<string, unknown>): Record<string, unknown> {
const normalizedStatus = normalizeResponsesInputStatus(item.status);
if (normalizedStatus) {
return {
...item,
status: normalizedStatus,
};
}
if (!Object.prototype.hasOwnProperty.call(item, 'status')) {
return item;
}
const { status: _status, ...rest } = item;
return rest;
}
function firstNonEmptyTrimmedString(...values: unknown[]): string {
for (const value of values) {
const normalized = asTrimmedString(value);
@@ -139,7 +169,7 @@ function toResponsesInputMessageFromText(text: string): Record<string, unknown>
export function normalizeResponsesMessageItem(item: Record<string, unknown>): Record<string, unknown> {
const type = asTrimmedString(item.type).toLowerCase();
if (type === 'function_call' || type === 'function_call_output') {
return item;
return withNormalizedResponsesInputStatus(item);
}
const role = asTrimmedString(item.role).toLowerCase() || 'user';
@@ -149,20 +179,20 @@ export function normalizeResponsesMessageItem(item: Record<string, unknown>): Re
);
if (type === 'message') {
return {
return withNormalizedResponsesInputStatus({
...item,
role,
content: normalizedContent,
};
});
}
if (asTrimmedString(item.role)) {
return {
return withNormalizedResponsesInputStatus({
...item,
type: 'message',
role,
content: normalizedContent,
};
});
}
if (typeof item.content === 'string' || typeof item.text === 'string') {
@@ -170,7 +200,7 @@ export function normalizeResponsesMessageItem(item: Record<string, unknown>): Re
return text ? toResponsesInputMessageFromText(text) : item;
}
return item;
return withNormalizedResponsesInputStatus(item);
}
export function normalizeResponsesInputForCompatibility(input: unknown): unknown {
@@ -23,6 +23,8 @@ type EndpointAttemptContext = {
type EndpointRecoverResult = {
upstream: UndiciResponse;
upstreamPath: string;
request?: CompatibilityRequest;
targetUrl?: string;
} | null;
type CompatibilityRequest = {
@@ -70,6 +72,7 @@ export function createChatEndpointStrategy(input: CreateChatEndpointStrategyInpu
return {
upstream: normalizedResponse,
upstreamPath: normalizedClaudeRequest.path,
request: normalizedClaudeRequest,
};
}
@@ -104,6 +107,8 @@ export function createChatEndpointStrategy(input: CreateChatEndpointStrategyInpu
return {
upstream: minimalResponse,
upstreamPath: minimalRequest.path,
request: minimalRequest,
targetUrl: ctx.targetUrl,
};
}
+2 -1
View File
@@ -2092,7 +2092,8 @@ body {
}
.site-balance-inline {
display: inline-flex;
display: flex;
width: 100%;
flex-wrap: wrap;
align-items: baseline;
gap: 6px;
+4 -12
View File
@@ -24,6 +24,10 @@ import {
} from './helpers/routeMissingTokenHints.js';
import { buildVisibleRouteList } from './helpers/routeListVisibility.js';
import { buildZeroChannelPlaceholderRoutes } from './helpers/zeroChannelRoutes.js';
import {
getRouteRoutingStrategyLabel,
normalizeRouteRoutingStrategyValue,
} from './token-routes/routingStrategy.js';
import type {
RouteSortBy,
@@ -89,18 +93,6 @@ const EMPTY_ROUTE_FORM: RouteEditorForm = {
advancedOpen: false,
};
function normalizeRouteRoutingStrategyValue(value?: RouteRoutingStrategy | null): RouteRoutingStrategy {
if (value === 'round_robin' || value === 'stable_first') return value;
return 'weighted';
}
function getRouteRoutingStrategyLabel(value?: RouteRoutingStrategy | null): string {
const strategy = normalizeRouteRoutingStrategyValue(value);
if (strategy === 'round_robin') return tr('轮询');
if (strategy === 'stable_first') return tr('稳定优先');
return tr('权重随机');
}
function getRouteRoutingStrategySuccessMessage(value: RouteRoutingStrategy): string {
if (value === 'round_robin') return '已切换为轮询策略';
if (value === 'stable_first') return '已切换为稳定优先策略';
+7 -15
View File
@@ -28,6 +28,10 @@ import type {
} from './types.js';
import type { RouteCandidateView, RouteTokenOption } from '../helpers/routeModelCandidatesIndex.js';
import { SortableChannelRow } from './SortableChannelRow.js';
import {
getRouteRoutingStrategyLabel,
normalizeRouteRoutingStrategyValue,
} from './routingStrategy.js';
import {
isRouteExactModel,
isExplicitGroupRoute,
@@ -76,18 +80,6 @@ type RouteCardProps = {
onToggleSourceGroup: (groupKey: string) => void;
};
function normalizeRoutingStrategy(value?: RouteRoutingStrategy | null): RouteRoutingStrategy {
if (value === 'round_robin' || value === 'stable_first') return value;
return 'weighted';
}
function getRoutingStrategyLabel(value?: RouteRoutingStrategy | null): string {
const strategy = normalizeRoutingStrategy(value);
if (strategy === 'round_robin') return tr('轮询');
if (strategy === 'stable_first') return tr('稳定优先');
return tr('权重随机');
}
function AnimatedCollapseSection({ open, children }: { open: boolean; children: ReactNode }) {
const presence = useAnimatedVisibility(open, 220);
if (!presence.shouldRender) return null;
@@ -139,7 +131,7 @@ function RouteCardInner({
const readOnlyRoute = route.kind === 'zero_channel' || route.readOnly === true || route.isVirtual === true;
const channelManagementDisabled = explicitGroupRoute;
const title = resolveRouteTitle(route);
const routingStrategy = normalizeRoutingStrategy(route.routingStrategy);
const routingStrategy = normalizeRouteRoutingStrategyValue(route.routingStrategy);
const routingStrategyOptions = [
{
value: 'weighted',
@@ -252,7 +244,7 @@ function RouteCardInner({
</span>
) : (
<span className="badge badge-muted" style={{ fontSize: 10, flexShrink: 0 }}>
{getRoutingStrategyLabel(routingStrategy)}
{getRouteRoutingStrategyLabel(routingStrategy)}
</span>
)}
@@ -394,7 +386,7 @@ function RouteCardInner({
{explicitGroupRoute ? (
<div style={{ fontSize: 12, color: 'var(--color-text-muted)', marginBottom: 10 }}>
{tr('该群组会将多个来源模型聚合为一个对外模型名;通道信息继承自来源模型,当前仅支持查看,不支持直接维护。')}
{tr('该群组会将多个来源模型聚合为一个对外模型名;当前策略以群组设置为准,来源模型会尽量跟随同步,但已单独自定义或被其他群组复用的来源模型不会被覆盖。')}
</div>
) : !exactRoute ? (
<div style={{ fontSize: 12, color: 'var(--color-text-muted)', marginBottom: 10 }}>
@@ -0,0 +1,14 @@
import { tr } from '../../i18n.js';
import type { RouteRoutingStrategy } from './types.js';
export function normalizeRouteRoutingStrategyValue(value?: RouteRoutingStrategy | null): RouteRoutingStrategy {
if (value === 'round_robin' || value === 'stable_first') return value;
return 'weighted';
}
export function getRouteRoutingStrategyLabel(value?: RouteRoutingStrategy | null): string {
const strategy = normalizeRouteRoutingStrategyValue(value);
if (strategy === 'round_robin') return tr('轮询');
if (strategy === 'stable_first') return tr('稳定优先');
return tr('权重随机');
}
+33 -13
View File
@@ -66,15 +66,21 @@ require_cmd docker
require_cmd tar
require_cmd git
require_file "$COMPOSE_FILE"
require_file "$COMPOSE_OVERRIDE_FILE"
require_file "$ENV_FILE"
COMPOSE_ARGS=(
-f "$COMPOSE_FILE"
-f "$COMPOSE_OVERRIDE_FILE"
--env-file "$ENV_FILE"
)
if [ -f "$COMPOSE_OVERRIDE_FILE" ]; then
COMPOSE_ARGS=(
-f "$COMPOSE_FILE"
-f "$COMPOSE_OVERRIDE_FILE"
--env-file "$ENV_FILE"
)
fi
if [ "$DO_PULL" -eq 1 ]; then
echo "[0/5] Checking git worktree..."
if [ -n "$(git -C "$ROOT_DIR" status --porcelain)" ]; then
@@ -89,36 +95,50 @@ else
fi
BACKUP_FILE=""
RESTORE_ON_ERROR=0
restore_if_failed() {
if [ "$RESTORE_ON_ERROR" -eq 1 ]; then
echo "Update failed after containers were stopped; attempting to restore service..." >&2
docker compose "${COMPOSE_ARGS[@]}" up -d --force-recreate || true
fi
}
trap restore_if_failed ERR
if [ "$DO_PULL" -eq 1 ]; then
echo "[2/5] Stopping current containers..."
else
echo "[2/4] Stopping current containers..."
fi
docker compose "${COMPOSE_ARGS[@]}" down --remove-orphans
RESTORE_ON_ERROR=1
if [ "$SKIP_BACKUP" -eq 0 ]; then
mkdir -p "$DATA_DIR"
BACKUP_FILE="$ROOT_DIR/docker/data-backup-$(date +%Y%m%d-%H%M%S).tar.gz"
if [ "$DO_PULL" -eq 1 ]; then
echo "[2/5] Backing up data directory..."
echo "[3/5] Backing up data directory..."
else
echo "[2/4] Backing up data directory..."
echo "[3/4] Backing up data directory..."
fi
tar -czf "$BACKUP_FILE" -C "$DATA_DIR" .
else
if [ "$DO_PULL" -eq 1 ]; then
echo "[2/5] Skipping data backup by request..."
echo "[3/5] Skipping data backup by request..."
else
echo "[2/4] Skipping data backup by request..."
echo "[3/4] Skipping data backup by request..."
fi
fi
if [ "$DO_PULL" -eq 1 ]; then
echo "[3/5] Stopping current containers..."
else
echo "[3/4] Stopping current containers..."
fi
docker compose "${COMPOSE_ARGS[@]}" down --remove-orphans
if [ "$DO_PULL" -eq 1 ]; then
echo "[4/5] Rebuilding and starting containers..."
else
echo "[4/4] Rebuilding and starting containers..."
fi
docker compose "${COMPOSE_ARGS[@]}" up -d --build --force-recreate
RESTORE_ON_ERROR=0
trap - ERR
echo
echo "Container status:"