Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add basic retry for rust crypto outgoing requests #4061

Merged
merged 10 commits into from
Feb 26, 2024
278 changes: 277 additions & 1 deletion spec/unit/rust-crypto/OutgoingRequestProcessor.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,9 @@ import {
UploadSigningKeysRequest,
ToDeviceRequest,
} from "@matrix-org/matrix-sdk-crypto-wasm";
import fetchMock from "fetch-mock-jest";

import { TypedEventEmitter } from "../../../src/models/typed-event-emitter";
import { TypedEventEmitter } from "../../../src";
import { HttpApiEvent, HttpApiEventHandlerMap, IHttpOpts, MatrixHttpApi, UIAuthCallback } from "../../../src";
import { OutgoingRequestProcessor } from "../../../src/rust-crypto/OutgoingRequestProcessor";
import { defer } from "../../../src/utils";
Expand Down Expand Up @@ -274,4 +275,279 @@ describe("OutgoingRequestProcessor", () => {
// ... and `makeOutgoingRequest` resolves satisfactorily
await result;
});

describe("Should retry requests", () => {
beforeEach(() => {
jest.useFakeTimers();

// here we use another httpApi instance in order to use fetchMock
const dummyEventEmitter = new TypedEventEmitter<HttpApiEvent, HttpApiEventHandlerMap>();
const httpApi = new MatrixHttpApi(dummyEventEmitter, {
baseUrl: "https://example.com",
prefix: "/_matrix",
onlyData: true,
});

processor = new OutgoingRequestProcessor(olmMachine, httpApi);
});

afterEach(() => {
jest.useRealTimers();
fetchMock.reset();
});

describe("Should retry on retryable errors", () => {
const retryableErrors: Array<[number, { status: number; body: { error: string } }]> = [
[429, { status: 429, body: { error: "Too Many Requests" } }],
[500, { status: 500, body: { error: "Internal Server Error" } }],
[502, { status: 502, body: { error: "Bad Gateway" } }],
[503, { status: 503, body: { error: "Service Unavailable" } }],
[504, { status: 504, body: { error: "Gateway timeout" } }],
[504, { status: 504, body: { error: "Gateway Timeout" } }],
[505, { status: 505, body: { error: "HTTP Version Not Supported" } }],
[506, { status: 506, body: { error: "Variant Also Negotiates" } }],
[507, { status: 507, body: { error: "Insufficient Storage" } }],
[508, { status: 508, body: { error: "Loop Detected" } }],
[510, { status: 510, body: { error: "Not Extended" } }],
[511, { status: 511, body: { error: "Network Authentication Required" } }],
[525, { status: 525, body: { error: "SSL Handshake Failed" } }],
];
describe.each(retryableErrors)(`When status code is %s`, (_, error) => {
test.each(tests)(`for request of type %ss`, async (_, RequestClass, expectedMethod, expectedPath) => {
// first, mock up a request as we might expect to receive it from the Rust layer ...
const testBody = '{ "foo": "bar" }';
const outgoingRequest = new RequestClass("1234", testBody);

// @ts-ignore to avoid having to do if else to switch the method (.put/.post)
fetchMock[expectedMethod.toLowerCase()](expectedPath, error);
BillCarsonFr marked this conversation as resolved.
Show resolved Hide resolved

const requestPromise = processor.makeOutgoingRequest(outgoingRequest);

// Run all timers and wait for the request promise to resolve/reject
await Promise.all([jest.runAllTimersAsync(), requestPromise.catch(() => {})]);

await expect(requestPromise).rejects.toThrow();

// Should have ultimately made 4 requests (1 initial + 3 retries)
const calls = fetchMock.calls(expectedPath);
expect(calls).toHaveLength(4);

// The promise should have been rejected
await expect(requestPromise).rejects.toThrow();
});
});
});

it("should retry on Failed to fetch connection errors", async () => {
let callCount = 0;
fetchMock.post("path:/_matrix/client/v3/keys/upload", (url, opts) => {
callCount++;
if (callCount == 2) {
return {
status: 200,
body: "{}",
};
} else {
throw new Error("Failed to fetch");
}
});

const outgoingRequest = new KeysUploadRequest("1234", "{}");

const requestPromise = processor.makeOutgoingRequest(outgoingRequest);

await Promise.all([requestPromise, jest.runAllTimersAsync()]);

const calls = fetchMock.calls("path:/_matrix/client/v3/keys/upload");
expect(calls).toHaveLength(2);
expect(olmMachine.markRequestAsSent).toHaveBeenCalled();
});

it("should retry to send to-device", async () => {
let callCount = 0;
const testBody = '{ "messages": { "user": {"device": "bar" }}}';
const outgoingRequest = new ToDeviceRequest("1234", "custom.type", "12345", testBody);

fetchMock.put("express:/_matrix/client/v3/sendToDevice/:type/:txnId", (url, opts) => {
callCount++;
if (callCount == 2) {
return {
status: 200,
body: "{}",
};
} else {
throw new Error("Failed to fetch");
}
});

const requestPromise = processor.makeOutgoingRequest(outgoingRequest);

await Promise.all([requestPromise, jest.runAllTimersAsync()]);

const calls = fetchMock.calls("express:/_matrix/client/v3/sendToDevice/:type/:txnId");
expect(calls).toHaveLength(2);
expect(olmMachine.markRequestAsSent).toHaveBeenCalled();
});

it("should retry to call with UIA", async () => {
let callCount = 0;
const testBody = '{ "foo": "bar" }';
const outgoingRequest = new UploadSigningKeysRequest(testBody);

fetchMock.post("path:/_matrix/client/v3/keys/device_signing/upload", (url, opts) => {
callCount++;
if (callCount == 2) {
return {
status: 200,
body: "{}",
};
} else {
throw new Error("Failed to fetch");
}
});
const authCallback: UIAuthCallback<Object> = async (makeRequest) => {
return await makeRequest({ type: "test" });
};
const requestPromise = processor.makeOutgoingRequest(outgoingRequest, authCallback);

await Promise.all([requestPromise, jest.runAllTimersAsync()]);

const calls = fetchMock.calls("path:/_matrix/client/v3/keys/device_signing/upload");
expect(calls).toHaveLength(2);
// Will not mark as sent as it's a UIA request
});

it("should retry on respect server cool down on LIMIT_EXCEEDED", async () => {
const retryAfterMs = 5000;
let callCount = 0;

fetchMock.post("path:/_matrix/client/v3/keys/upload", (url, opts) => {
callCount++;
if (callCount == 2) {
return {
status: 200,
body: "{}",
};
} else {
return {
status: 429,
body: {
errcode: "M_LIMIT_EXCEEDED",
error: "Too many requests",
retry_after_ms: retryAfterMs,
},
};
}
});

const outgoingRequest = new KeysUploadRequest("1234", "{}");

const requestPromise = processor.makeOutgoingRequest(outgoingRequest);

// advanced by less than the retryAfterMs
await jest.advanceTimersByTimeAsync(retryAfterMs - 1000);

// should not have made a second request yet
{
const calls = fetchMock.calls("path:/_matrix/client/v3/keys/upload");
expect(calls).toHaveLength(1);
}

// advanced by the remaining time
await jest.advanceTimersByTimeAsync(retryAfterMs + 1000);

await requestPromise;

const calls = fetchMock.calls("path:/_matrix/client/v3/keys/upload");
expect(calls).toHaveLength(2);
expect(olmMachine.markRequestAsSent).toHaveBeenCalled();
});

const nonRetryableErrors: Array<[number, { status: number; body: { errcode: string } }]> = [
[400, { status: 400, body: { errcode: "Bad Request" } }],
[401, { status: 401, body: { errcode: "M_UNKNOWN_TOKEN" } }],
[403, { status: 403, body: { errcode: "M_FORBIDDEN" } }],
];

describe.each(nonRetryableErrors)("Should not retry all sort of errors", (_, error) => {
test.each(tests)("for %ss", async (_, RequestClass, expectedMethod, expectedPath) => {
const testBody = '{ "foo": "bar" }';
const outgoingRequest = new RequestClass("1234", testBody);

// @ts-ignore to avoid having to do if else to switch the method (.put/.post)
fetchMock[expectedMethod.toLowerCase()](expectedPath, {
status: error.status,
body: error.body,
});

const requestPromise = processor.makeOutgoingRequest(outgoingRequest);

// Run all timers and wait for the request promise to resolve/reject
await Promise.all([jest.runAllTimersAsync(), requestPromise.catch(() => {})]);

await expect(requestPromise).rejects.toThrow();

// Should have ultimately made 4 requests (1 initial + 3 retries)
const calls = fetchMock.calls(expectedPath);
expect(calls).toHaveLength(1);

await expect(requestPromise).rejects.toThrow();
});
});

describe("Should not retry client timeouts", () => {
test.each(tests)("for %ss", async (_, RequestClass, expectedMethod, expectedPath) => {
const testBody = '{ "foo": "bar" }';
const outgoingRequest = new RequestClass("1234", testBody);

// @ts-ignore to avoid having to do if else to switch the method (.put/.post)
fetchMock[expectedMethod.toLowerCase()](expectedPath, () => {
// This is what a client timeout error will throw
throw new DOMException("The user aborted a request.", "AbortError");
});

const requestPromise = processor.makeOutgoingRequest(outgoingRequest);

// Run all timers and wait for the request promise to resolve/reject
await Promise.all([jest.runAllTimersAsync(), requestPromise.catch(() => {})]);

await expect(requestPromise).rejects.toThrow();

// Should have ultimately made 4 requests (1 initial + 3 retries)
const calls = fetchMock.calls(expectedPath);
expect(calls).toHaveLength(1);
await expect(requestPromise).rejects.toThrow();
});
});

describe("Should retry until it works", () => {
it.each([1, 2, 3, 4])("should succeed if the call number %s is ok", async (successfulCall) => {
let callCount = 0;
fetchMock.post("path:/_matrix/client/v3/keys/upload", (url, opts) => {
callCount++;
if (callCount == successfulCall) {
return {
status: 200,
body: "{}",
};
} else {
return {
status: 500,
body: { error: "Internal server error" },
};
}
});

const outgoingRequest = new KeysUploadRequest("1234", "{}");

const requestPromise = processor.makeOutgoingRequest(outgoingRequest);

await Promise.all([requestPromise, jest.runAllTimersAsync()]);

const calls = fetchMock.calls("path:/_matrix/client/v3/keys/upload");
expect(calls).toHaveLength(successfulCall);
expect(olmMachine.markRequestAsSent).toHaveBeenCalled();
});
});
});
});
Loading
Loading