From d3dfcd924201d71b434af3d77343b5229b6ed75e Mon Sep 17 00:00:00 2001 From: Valere Date: Mon, 26 Feb 2024 15:07:28 +0100 Subject: [PATCH] Add basic retry for rust crypto outgoing requests (#4061) * Add basic retry for outgoing requests * Update doc * Remove 504 from retryable * Retry all 5xx and clarify client timeouts * code review cleaning * do not retry rust request if M_TOO_LARGE * refactor use common retry alg between scheduler and rust requests * Code review, cleaning and doc --- .../OutgoingRequestProcessor.spec.ts | 301 +++++++++++++++++- src/http-api/utils.ts | 46 +++ src/rust-crypto/OutgoingRequestProcessor.ts | 49 ++- src/scheduler.ts | 31 +- 4 files changed, 386 insertions(+), 41 deletions(-) diff --git a/spec/unit/rust-crypto/OutgoingRequestProcessor.spec.ts b/spec/unit/rust-crypto/OutgoingRequestProcessor.spec.ts index 3501ec8faed..e63243d2913 100644 --- a/spec/unit/rust-crypto/OutgoingRequestProcessor.spec.ts +++ b/spec/unit/rust-crypto/OutgoingRequestProcessor.spec.ts @@ -27,8 +27,9 @@ import { UploadSigningKeysRequest, ToDeviceRequest, } from "@matrix-org/matrix-sdk-crypto-wasm"; +import fetchMock from "fetch-mock-jest"; -import { TypedEventEmitter } from "../../../src/models/typed-event-emitter"; +import { TypedEventEmitter } from "../../../src"; import { HttpApiEvent, HttpApiEventHandlerMap, IHttpOpts, MatrixHttpApi, UIAuthCallback } from "../../../src"; import { OutgoingRequestProcessor } from "../../../src/rust-crypto/OutgoingRequestProcessor"; import { defer } from "../../../src/utils"; @@ -274,4 +275,302 @@ describe("OutgoingRequestProcessor", () => { // ... and `makeOutgoingRequest` resolves satisfactorily await result; }); + + describe("Should retry requests", () => { + beforeEach(() => { + jest.useFakeTimers(); + + // here we use another httpApi instance in order to use fetchMock + const dummyEventEmitter = new TypedEventEmitter(); + const httpApi = new MatrixHttpApi(dummyEventEmitter, { + baseUrl: "https://example.com", + prefix: "/_matrix", + onlyData: true, + }); + + processor = new OutgoingRequestProcessor(olmMachine, httpApi); + }); + + afterEach(() => { + jest.useRealTimers(); + fetchMock.reset(); + }); + + describe("Should retry on retryable errors", () => { + const retryableErrors: Array<[number, { status: number; body: { error: string } }]> = [ + [429, { status: 429, body: { error: "Too Many Requests" } }], + [500, { status: 500, body: { error: "Internal Server Error" } }], + [502, { status: 502, body: { error: "Bad Gateway" } }], + [503, { status: 503, body: { error: "Service Unavailable" } }], + [504, { status: 504, body: { error: "Gateway timeout" } }], + [505, { status: 505, body: { error: "HTTP Version Not Supported" } }], + [506, { status: 506, body: { error: "Variant Also Negotiates" } }], + [507, { status: 507, body: { error: "Insufficient Storage" } }], + [508, { status: 508, body: { error: "Loop Detected" } }], + [510, { status: 510, body: { error: "Not Extended" } }], + [511, { status: 511, body: { error: "Network Authentication Required" } }], + [525, { status: 525, body: { error: "SSL Handshake Failed" } }], + ]; + describe.each(retryableErrors)(`When status code is %s`, (_, error) => { + test.each(tests)(`for request of type %ss`, async (_, RequestClass, expectedMethod, expectedPath) => { + // first, mock up a request as we might expect to receive it from the Rust layer ... + const testBody = '{ "foo": "bar" }'; + const outgoingRequest = new RequestClass("1234", testBody); + + fetchMock.mock(expectedPath, error, { method: expectedMethod }); + + const requestPromise = processor.makeOutgoingRequest(outgoingRequest); + + // Run all timers and wait for the request promise to resolve/reject + await Promise.all([jest.runAllTimersAsync(), requestPromise.catch(() => {})]); + + await expect(requestPromise).rejects.toThrow(); + + // Should have ultimately made 5 requests (1 initial + 4 retries) + const calls = fetchMock.calls(expectedPath); + expect(calls).toHaveLength(5); + + // The promise should have been rejected + await expect(requestPromise).rejects.toThrow(); + }); + }); + }); + + it("should not retry if M_TOO_LARGE", async () => { + const testBody = '{ "messages": { "user": {"device": "bar" }}}'; + const outgoingRequest = new ToDeviceRequest("1234", "custom.type", "12345", testBody); + + fetchMock.put("express:/_matrix/client/v3/sendToDevice/:type/:txnId", { + status: 502, + body: { + errcode: "M_TOO_LARGE", + error: "Request too large", + }, + }); + + const requestPromise = processor.makeOutgoingRequest(outgoingRequest); + + await Promise.all([requestPromise.catch(() => {}), jest.runAllTimersAsync()]); + + await expect(requestPromise).rejects.toThrow(); + + const calls = fetchMock.calls("express:/_matrix/client/v3/sendToDevice/:type/:txnId"); + expect(calls).toHaveLength(1); + + // The promise should have been rejected + await expect(requestPromise).rejects.toThrow(); + }); + + it("should retry on Failed to fetch connection errors", async () => { + let callCount = 0; + fetchMock.post("path:/_matrix/client/v3/keys/upload", (url, opts) => { + callCount++; + if (callCount == 2) { + return { + status: 200, + body: "{}", + }; + } else { + throw new Error("Failed to fetch"); + } + }); + + const outgoingRequest = new KeysUploadRequest("1234", "{}"); + + const requestPromise = processor.makeOutgoingRequest(outgoingRequest); + + await Promise.all([requestPromise, jest.runAllTimersAsync()]); + + const calls = fetchMock.calls("path:/_matrix/client/v3/keys/upload"); + expect(calls).toHaveLength(2); + expect(olmMachine.markRequestAsSent).toHaveBeenCalled(); + }); + + it("should retry to send to-device", async () => { + let callCount = 0; + const testBody = '{ "messages": { "user": {"device": "bar" }}}'; + const outgoingRequest = new ToDeviceRequest("1234", "custom.type", "12345", testBody); + + fetchMock.put("express:/_matrix/client/v3/sendToDevice/:type/:txnId", (url, opts) => { + callCount++; + if (callCount == 2) { + return { + status: 200, + body: "{}", + }; + } else { + throw new Error("Failed to fetch"); + } + }); + + const requestPromise = processor.makeOutgoingRequest(outgoingRequest); + + await Promise.all([requestPromise, jest.runAllTimersAsync()]); + + const calls = fetchMock.calls("express:/_matrix/client/v3/sendToDevice/:type/:txnId"); + expect(calls).toHaveLength(2); + expect(olmMachine.markRequestAsSent).toHaveBeenCalled(); + }); + + it("should retry to call with UIA", async () => { + let callCount = 0; + const testBody = '{ "foo": "bar" }'; + const outgoingRequest = new UploadSigningKeysRequest(testBody); + + fetchMock.post("path:/_matrix/client/v3/keys/device_signing/upload", (url, opts) => { + callCount++; + if (callCount == 2) { + return { + status: 200, + body: "{}", + }; + } else { + throw new Error("Failed to fetch"); + } + }); + const authCallback: UIAuthCallback = async (makeRequest) => { + return await makeRequest({ type: "test" }); + }; + const requestPromise = processor.makeOutgoingRequest(outgoingRequest, authCallback); + + await Promise.all([requestPromise, jest.runAllTimersAsync()]); + + const calls = fetchMock.calls("path:/_matrix/client/v3/keys/device_signing/upload"); + expect(calls).toHaveLength(2); + // Will not mark as sent as it's a UIA request + }); + + it("should retry on respect server cool down on LIMIT_EXCEEDED", async () => { + const retryAfterMs = 5000; + let callCount = 0; + + fetchMock.post("path:/_matrix/client/v3/keys/upload", (url, opts) => { + callCount++; + if (callCount == 2) { + return { + status: 200, + body: "{}", + }; + } else { + return { + status: 429, + body: { + errcode: "M_LIMIT_EXCEEDED", + error: "Too many requests", + retry_after_ms: retryAfterMs, + }, + }; + } + }); + + const outgoingRequest = new KeysUploadRequest("1234", "{}"); + + const requestPromise = processor.makeOutgoingRequest(outgoingRequest); + + // advanced by less than the retryAfterMs + await jest.advanceTimersByTimeAsync(retryAfterMs - 1000); + + // should not have made a second request yet + { + const calls = fetchMock.calls("path:/_matrix/client/v3/keys/upload"); + expect(calls).toHaveLength(1); + } + + // advanced by the remaining time + await jest.advanceTimersByTimeAsync(retryAfterMs + 1000); + + await requestPromise; + + const calls = fetchMock.calls("path:/_matrix/client/v3/keys/upload"); + expect(calls).toHaveLength(2); + expect(olmMachine.markRequestAsSent).toHaveBeenCalled(); + }); + + const nonRetryableErrors: Array<[number, { status: number; body: { errcode: string } }]> = [ + [400, { status: 400, body: { errcode: "Bad Request" } }], + [401, { status: 401, body: { errcode: "M_UNKNOWN_TOKEN" } }], + [403, { status: 403, body: { errcode: "M_FORBIDDEN" } }], + ]; + + describe.each(nonRetryableErrors)("Should not retry all sort of errors", (_, error) => { + test.each(tests)("for %ss", async (_, RequestClass, expectedMethod, expectedPath) => { + const testBody = '{ "foo": "bar" }'; + const outgoingRequest = new RequestClass("1234", testBody); + + // @ts-ignore to avoid having to do if else to switch the method (.put/.post) + fetchMock[expectedMethod.toLowerCase()](expectedPath, { + status: error.status, + body: error.body, + }); + + const requestPromise = processor.makeOutgoingRequest(outgoingRequest); + + // Run all timers and wait for the request promise to resolve/reject + await Promise.all([jest.runAllTimersAsync(), requestPromise.catch(() => {})]); + + await expect(requestPromise).rejects.toThrow(); + + // Should have only tried once + const calls = fetchMock.calls(expectedPath); + expect(calls).toHaveLength(1); + + await expect(requestPromise).rejects.toThrow(); + }); + }); + + describe("Should not retry client timeouts", () => { + test.each(tests)("for %ss", async (_, RequestClass, expectedMethod, expectedPath) => { + const testBody = '{ "foo": "bar" }'; + const outgoingRequest = new RequestClass("1234", testBody); + + // @ts-ignore to avoid having to do if else to switch the method (.put/.post) + fetchMock[expectedMethod.toLowerCase()](expectedPath, () => { + // This is what a client timeout error will throw + throw new DOMException("The user aborted a request.", "AbortError"); + }); + + const requestPromise = processor.makeOutgoingRequest(outgoingRequest); + + // Run all timers and wait for the request promise to resolve/reject + await Promise.all([jest.runAllTimersAsync(), requestPromise.catch(() => {})]); + + await expect(requestPromise).rejects.toThrow(); + + // Should have only tried once + const calls = fetchMock.calls(expectedPath); + expect(calls).toHaveLength(1); + await expect(requestPromise).rejects.toThrow(); + }); + }); + + describe("Should retry until it works", () => { + it.each([1, 2, 3, 4])("should succeed if the call number %s is ok", async (successfulCall) => { + let callCount = 0; + fetchMock.post("path:/_matrix/client/v3/keys/upload", (url, opts) => { + callCount++; + if (callCount == successfulCall) { + return { + status: 200, + body: "{}", + }; + } else { + return { + status: 500, + body: { error: "Internal server error" }, + }; + } + }); + + const outgoingRequest = new KeysUploadRequest("1234", "{}"); + + const requestPromise = processor.makeOutgoingRequest(outgoingRequest); + + await Promise.all([requestPromise, jest.runAllTimersAsync()]); + + const calls = fetchMock.calls("path:/_matrix/client/v3/keys/upload"); + expect(calls).toHaveLength(successfulCall); + expect(olmMachine.markRequestAsSent).toHaveBeenCalled(); + }); + }); + }); }); diff --git a/src/http-api/utils.ts b/src/http-api/utils.ts index c49be740ef6..0b3c3554ffa 100644 --- a/src/http-api/utils.ts +++ b/src/http-api/utils.ts @@ -151,3 +151,49 @@ export async function retryNetworkOperation(maxAttempts: number, callback: () } throw lastConnectionError; } + +/** + * Calculate the backoff time for a request retry attempt. + * This produces wait times of 2, 4, 8, and 16 seconds (30s total) after which we give up. If the + * failure was due to a rate limited request, the time specified in the error is returned. + * + * Returns -1 if the error is not retryable, or if we reach the maximum number of attempts. + * + * @param err - The error thrown by the http call + * @param attempts - The number of attempts made so far, including the one that just failed. + * @param retryConnectionError - Whether to retry on {@link ConnectionError} (CORS, connection is down, etc.) + */ +export function calculateRetryBackoff(err: any, attempts: number, retryConnectionError: boolean): number { + if (attempts > 4) { + return -1; // give up + } + + if (err instanceof ConnectionError && !retryConnectionError) { + return -1; + } + + if (err.httpStatus && (err.httpStatus === 400 || err.httpStatus === 403 || err.httpStatus === 401)) { + // client error; no amount of retrying will save you now. + return -1; + } + + if (err.name === "AbortError") { + // this is a client timeout, that is already very high 60s/80s + // we don't want to retry, as it could do it for very long + return -1; + } + + // If we are trying to send an event (or similar) that is too large in any way, then retrying won't help + if (err.name === "M_TOO_LARGE") { + return -1; + } + + if (err.name === "M_LIMIT_EXCEEDED") { + const waitTime = err.data.retry_after_ms; + if (waitTime > 0) { + return waitTime; + } + } + + return 1000 * Math.pow(2, attempts); +} diff --git a/src/rust-crypto/OutgoingRequestProcessor.ts b/src/rust-crypto/OutgoingRequestProcessor.ts index 18b9d6b03fa..8e7e15584d5 100644 --- a/src/rust-crypto/OutgoingRequestProcessor.ts +++ b/src/rust-crypto/OutgoingRequestProcessor.ts @@ -15,11 +15,11 @@ limitations under the License. */ import { - OlmMachine, KeysBackupRequest, KeysClaimRequest, KeysQueryRequest, KeysUploadRequest, + OlmMachine, RoomMessageRequest, SignatureUploadRequest, ToDeviceRequest, @@ -27,8 +27,8 @@ import { } from "@matrix-org/matrix-sdk-crypto-wasm"; import { logger } from "../logger"; -import { IHttpOpts, MatrixHttpApi, Method } from "../http-api"; -import { logDuration, QueryDict } from "../utils"; +import { calculateRetryBackoff, IHttpOpts, MatrixHttpApi, Method } from "../http-api"; +import { logDuration, QueryDict, sleep } from "../utils"; import { IAuthDict, UIAuthCallback } from "../interactive-auth"; import { UIAResponse } from "../@types/uia"; import { ToDeviceMessageId } from "../@types/event"; @@ -71,15 +71,15 @@ export class OutgoingRequestProcessor { * for the complete list of request types */ if (msg instanceof KeysUploadRequest) { - resp = await this.rawJsonRequest(Method.Post, "/_matrix/client/v3/keys/upload", {}, msg.body); + resp = await this.requestWithRetry(Method.Post, "/_matrix/client/v3/keys/upload", {}, msg.body); } else if (msg instanceof KeysQueryRequest) { - resp = await this.rawJsonRequest(Method.Post, "/_matrix/client/v3/keys/query", {}, msg.body); + resp = await this.requestWithRetry(Method.Post, "/_matrix/client/v3/keys/query", {}, msg.body); } else if (msg instanceof KeysClaimRequest) { - resp = await this.rawJsonRequest(Method.Post, "/_matrix/client/v3/keys/claim", {}, msg.body); + resp = await this.requestWithRetry(Method.Post, "/_matrix/client/v3/keys/claim", {}, msg.body); } else if (msg instanceof SignatureUploadRequest) { - resp = await this.rawJsonRequest(Method.Post, "/_matrix/client/v3/keys/signatures/upload", {}, msg.body); + resp = await this.requestWithRetry(Method.Post, "/_matrix/client/v3/keys/signatures/upload", {}, msg.body); } else if (msg instanceof KeysBackupRequest) { - resp = await this.rawJsonRequest( + resp = await this.requestWithRetry( Method.Put, "/_matrix/client/v3/room_keys/keys", { version: msg.version }, @@ -91,7 +91,7 @@ export class OutgoingRequestProcessor { const path = `/_matrix/client/v3/rooms/${encodeURIComponent(msg.room_id)}/send/` + `${encodeURIComponent(msg.event_type)}/${encodeURIComponent(msg.txn_id)}`; - resp = await this.rawJsonRequest(Method.Put, path, {}, msg.body); + resp = await this.requestWithRetry(Method.Put, path, {}, msg.body); } else if (msg instanceof UploadSigningKeysRequest) { await this.makeRequestWithUIA( Method.Post, @@ -154,7 +154,7 @@ export class OutgoingRequestProcessor { const path = `/_matrix/client/v3/sendToDevice/${encodeURIComponent(request.event_type)}/` + encodeURIComponent(request.txn_id); - return await this.rawJsonRequest(Method.Put, path, {}, request.body); + return await this.requestWithRetry(Method.Put, path, {}, request.body); } private async makeRequestWithUIA( @@ -165,7 +165,7 @@ export class OutgoingRequestProcessor { uiaCallback: UIAuthCallback | undefined, ): Promise { if (!uiaCallback) { - return await this.rawJsonRequest(method, path, queryParams, body); + return await this.requestWithRetry(method, path, queryParams, body); } const parsedBody = JSON.parse(body); @@ -176,7 +176,7 @@ export class OutgoingRequestProcessor { if (auth !== null) { newBody.auth = auth; } - const resp = await this.rawJsonRequest(method, path, queryParams, JSON.stringify(newBody)); + const resp = await this.requestWithRetry(method, path, queryParams, JSON.stringify(newBody)); return JSON.parse(resp) as T; }; @@ -184,6 +184,31 @@ export class OutgoingRequestProcessor { return JSON.stringify(resp); } + private async requestWithRetry( + method: Method, + path: string, + queryParams: QueryDict, + body: string, + ): Promise { + let currentRetryCount = 0; + + // eslint-disable-next-line no-constant-condition + while (true) { + try { + return await this.rawJsonRequest(method, path, queryParams, body); + } catch (e) { + currentRetryCount++; + const backoff = calculateRetryBackoff(e, currentRetryCount, true); + if (backoff < 0) { + // Max number of retries reached, or error is not retryable. rethrow the error + throw e; + } + // wait for the specified time and then retry the request + await sleep(backoff); + } + } + } + private async rawJsonRequest(method: Method, path: string, queryParams: QueryDict, body: string): Promise { const opts = { // inhibit the JSON stringification and parsing within HttpApi. diff --git a/src/scheduler.ts b/src/scheduler.ts index 41612f1c902..6dfd212c72c 100644 --- a/src/scheduler.ts +++ b/src/scheduler.ts @@ -22,7 +22,7 @@ import { logger } from "./logger"; import { MatrixEvent } from "./models/event"; import { EventType } from "./@types/event"; import { defer, IDeferred, removeElement } from "./utils"; -import { ConnectionError, MatrixError } from "./http-api"; +import { calculateRetryBackoff, MatrixError } from "./http-api"; import { ISendEventResponse } from "./@types/requests"; const DEBUG = false; // set true to enable console logging. @@ -43,38 +43,13 @@ type ProcessFunction = (event: MatrixEvent) => Promise; // eslint-disable-next-line camelcase export class MatrixScheduler { /** - * Retries events up to 4 times using exponential backoff. This produces wait - * times of 2, 4, 8, and 16 seconds (30s total) after which we give up. If the - * failure was due to a rate limited request, the time specified in the error is - * waited before being retried. + * Default retry algorithm for the matrix scheduler. Retries events up to 4 times with exponential backoff. * @param attempts - Number of attempts that have been made, including the one that just failed (ie. starting at 1) * @see retryAlgorithm */ // eslint-disable-next-line @typescript-eslint/naming-convention public static RETRY_BACKOFF_RATELIMIT(event: MatrixEvent | null, attempts: number, err: MatrixError): number { - if (err.httpStatus === 400 || err.httpStatus === 403 || err.httpStatus === 401) { - // client error; no amount of retrying with save you now. - return -1; - } - if (err instanceof ConnectionError) { - return -1; - } - - // if event that we are trying to send is too large in any way then retrying won't help - if (err.name === "M_TOO_LARGE") { - return -1; - } - - if (err.name === "M_LIMIT_EXCEEDED") { - const waitTime = err.data.retry_after_ms; - if (waitTime > 0) { - return waitTime; - } - } - if (attempts > 4) { - return -1; // give up - } - return 1000 * Math.pow(2, attempts); + return calculateRetryBackoff(err, attempts, false); } /**