Skip to content

Latest commit

 

History

History
749 lines (647 loc) · 28.4 KB

fileshare.md

File metadata and controls

749 lines (647 loc) · 28.4 KB

FileShare Attachments

Uses a file share to store attachments for messages.

Usage

Two settings are required as part of the default usage:

  • A file share or directory location.
  • A default time to keep for attachments.

configuration.EnableAttachments(
    fileShare: "networkSharePath",
    timeToKeep: _ => TimeSpan.FromDays(7));

snippet source | anchor

configuration.EnableAttachments(
    connectionFactory: async cancel =>
    {
        var connection = new SqlConnection(connectionString);
        try
        {
            await connection.OpenAsync(cancel).ConfigureAwait(false);
            return connection;
        }
        catch
        {
            await connection.DisposeAsync();
            throw;
        }
    },
    timeToKeep: _ => TimeSpan.FromDays(7));

snippet source | anchor

Recommended Usage

Uses the NServiceBus.Attachments.FileShare.TimeToKeep.Default method for attachment cleanup.

This usage results in the following:

configuration.EnableAttachments(
    fileShare: "networkSharePath",
    timeToKeep: TimeToKeep.Default);

snippet source | anchor

configuration.EnableAttachments(
    connectionFactory: OpenConnection,
    timeToKeep: TimeToKeep.Default);

snippet source | anchor

Data Cleanup

Attachment cleanup is enabled by default. It can be disabled using the following:

var attachments = configuration.EnableAttachments(
    fileShare: "networkSharePath",
    timeToKeep: TimeToKeep.Default);
attachments.DisableCleanupTask();

snippet source | anchor

var attachments = configuration.EnableAttachments(
    connectionFactory: OpenConnection,
    timeToKeep: TimeToKeep.Default);
attachments.DisableCleanupTask();

snippet source | anchor

Controlling attachment lifetime

When the cleanup task runs it uses the Expiry column to determine if a given attachment should be deleted. This column is populated when an attachment is written. When adding an attachment to an outgoing message, all methods accept an optional parameter timeToKeep of the type GetTimeToKeep. GetTimeToKeep is defined as:

public delegate TimeSpan GetTimeToKeep(TimeSpan? messageTimeToBeReceived);

Where messageTimeToBeReceived is value of TimeToBeReceived. If no timeToKeep parameter for a specific attachment is defined then the endpoint level timeToKeep is used.

The result of timeToKeep is then added to the current date and persisted to the Expiry column.

The method TimeToKeep.Default provides a recommended default for for attachment lifetime calculation:

  • If TimeToBeReceived is defined then keep attachment for twice that time.
  • Else; keep for 10 days.

Reading and writing attachments

Writing attachments to an outgoing message

Approaches to using attachments for an outgoing message.

Note: Stream.Dispose is called after the data has been persisted. As such it is not necessary for any code using attachments to perform this cleanup.

While the below examples illustrate adding an attachment to SendOptions, equivalent operations can be performed on PublishOptions and ReplyOptions

Factory Approach

The recommended approach for adding an attachment is by providing a delegate that constructs the stream. The execution of this delegate is then deferred until later in the outgoing pipeline, when the instance of the stream is required to be persisted.

There are both async and sync variants.

class HandlerFactory :
    IHandleMessages<MyMessage>
{
    public Task Handle(MyMessage message, HandlerContext context)
    {
        var sendOptions = new SendOptions();
        var attachments = sendOptions.Attachments();
        attachments.Add(
            name: "attachment1",
            streamFactory: () => File.OpenRead("FilePath.txt"));
        return context.Send(new OtherMessage(), sendOptions);
    }
}

snippet source | anchor

class HandlerFactory :
    IHandleMessages<MyMessage>
{
    public Task Handle(MyMessage message, HandlerContext context)
    {
        var sendOptions = new SendOptions();
        var attachments = sendOptions.Attachments();
        attachments.Add(
            name: "attachment1",
            streamFactory: () => File.OpenRead("FilePath.txt"));
        return context.Send(new OtherMessage(), sendOptions);
    }
}

snippet source | anchor

class HandlerFactoryAsync :
    IHandleMessages<MyMessage>
{
    static HttpClient httpClient = new();

    public Task Handle(MyMessage message, HandlerContext context)
    {
        var sendOptions = new SendOptions();
        var attachments = sendOptions.Attachments();
        attachments.Add(
            name: "attachment1",
            streamFactory: () => httpClient.GetStreamAsync("theUrl"));
        return context.Send(new OtherMessage(), sendOptions);
    }
}

snippet source | anchor

class HandlerFactoryAsync :
    IHandleMessages<MyMessage>
{
    static HttpClient httpClient = new();

    public Task Handle(MyMessage message, HandlerContext context)
    {
        var sendOptions = new SendOptions();
        var attachments = sendOptions.Attachments();
        attachments.Add(
            name: "attachment1",
            streamFactory: () => httpClient.GetStreamAsync("theUrl"));
        return context.Send(new OtherMessage(), sendOptions);
    }
}

snippet source | anchor

Instance Approach

In some cases an instance of a stream is already available in scope and as such it can be passed directly.

class HandlerInstance :
    IHandleMessages<MyMessage>
{
    public Task Handle(MyMessage message, HandlerContext context)
    {
        var sendOptions = new SendOptions();
        var attachments = sendOptions.Attachments();
        var stream = File.OpenRead("FilePath.txt");
        attachments.Add(
            name: "attachment1",
            stream: stream,
            cleanup: () => File.Delete("FilePath.txt"));
        return context.Send(new OtherMessage(), sendOptions);
    }
}

snippet source | anchor

class HandlerInstance :
    IHandleMessages<MyMessage>
{
    public Task Handle(MyMessage message, HandlerContext context)
    {
        var sendOptions = new SendOptions();
        var attachments = sendOptions.Attachments();
        var stream = File.OpenRead("FilePath.txt");
        attachments.Add(
            name: "attachment1",
            stream: stream,
            cleanup: () => File.Delete("FilePath.txt"));
        return context.Send(new OtherMessage(), sendOptions);
    }
}

snippet source | anchor

Reading attachments for an incoming message

Approaches to using attachments for the current incoming message.

Process named with a delegate

Processes an attachment with a specific name.

class HandlerProcessStream :
    IHandleMessages<MyMessage>
{
    public Task Handle(MyMessage message, HandlerContext context)
    {
        var attachments = context.Attachments();
        return attachments.ProcessStream(
            name: "attachment1",
            action: async(stream, token) =>
            {
                // Use the attachment stream. in this example copy to a file
                await using var fileToCopyTo = File.Create("FilePath.txt");
                await stream.CopyToAsync(fileToCopyTo, token);
            },
            context.CancellationToken);
    }
}

snippet source | anchor

class HandlerProcessStream :
    IHandleMessages<MyMessage>
{
    public Task Handle(MyMessage message, HandlerContext context)
    {
        var attachments = context.Attachments();
        return attachments.ProcessStream(
            name: "attachment1",
            action: async (stream, token) =>
            {
                // Use the attachment stream. in this example copy to a file
                await using var fileToCopyTo = File.Create("FilePath.txt");
                await stream.CopyToAsync(fileToCopyTo, token);
            },
            context.CancellationToken);
    }
}

snippet source | anchor

Process all with a delegate

Processes all attachments.

class HandlerProcessStreams :
    IHandleMessages<MyMessage>
{
    public Task Handle(MyMessage message, HandlerContext context)
    {
        var attachments = context.Attachments();
        return attachments.ProcessStreams(
            action: async (stream, cancel) =>
            {
                // Use the attachment stream. in this example copy to a file
                await using var file = File.Create($"{stream.Name}.txt");
                await stream.CopyToAsync(file, cancel);
            },
            context.CancellationToken);
    }
}

snippet source | anchor

class HandlerProcessStreams :
    IHandleMessages<MyMessage>
{
    public Task Handle(MyMessage message, HandlerContext context)
    {
        var attachments = context.Attachments();
        return attachments.ProcessStreams(
            action: async (stream, cancel) =>
            {
                // Use the attachment stream. in this example copy to a file
                await using var file = File.Create($"{stream.Name}.txt");
                await stream.CopyToAsync(file, cancel);
            },
            context.CancellationToken);
    }
}

snippet source | anchor

Copy to a stream

Copy an attachment with a specific name to another stream.

class HandlerCopyTo :
    IHandleMessages<MyMessage>
{
    public async Task Handle(MyMessage message, HandlerContext context)
    {
        var attachments = context.Attachments();
        await using var fileToCopyTo = File.Create("FilePath.txt");
        await attachments.CopyTo("attachment1", fileToCopyTo, context.CancellationToken);
    }
}

snippet source | anchor

class HandlerCopyTo :
    IHandleMessages<MyMessage>
{
    public async Task Handle(MyMessage message, HandlerContext context)
    {
        var attachments = context.Attachments();
        await using var fileToCopyTo = File.Create("FilePath.txt");
        await attachments.CopyTo("attachment1", fileToCopyTo, context.CancellationToken);
    }
}

snippet source | anchor

Get an instance of a stream

Get a stream for an attachment with a specific name.

class HandlerGetStream :
    IHandleMessages<MyMessage>
{
    public async Task Handle(MyMessage message, HandlerContext context)
    {
        var attachments = context.Attachments();
        await using var stream = await attachments.GetStream("attachment1", context.CancellationToken);
        // Use the attachment stream. in this example copy to a file
        await using var fileToCopyTo = File.Create("FilePath.txt");
        await stream.CopyToAsync(fileToCopyTo, context.CancellationToken);
    }
}

snippet source | anchor

class HandlerGetStream :
    IHandleMessages<MyMessage>
{
    public async Task Handle(MyMessage message, HandlerContext context)
    {
        var attachments = context.Attachments();
        await using var attachment = await attachments.GetStream("attachment1", context.CancellationToken);
        // Use the attachment stream. in this example copy to a file
        await using var fileToCopyTo = File.Create("FilePath.txt");
        await attachment.CopyToAsync(fileToCopyTo, context.CancellationToken);
    }
}

snippet source | anchor

Get data as bytes

Get a byte array for an attachment with a specific name.

WARNING: This should only be used the data size is know to be small as it causes the full size of the attachment to be allocated in memory.

class HandlerGetBytes :
    IHandleMessages<MyMessage>
{
    public async Task Handle(MyMessage message, HandlerContext context)
    {
        var attachments = context.Attachments();
        var bytes = await attachments.GetBytes("attachment1", context.CancellationToken);
        // use the byte array
    }
}

snippet source | anchor

class HandlerGetBytes :
    IHandleMessages<MyMessage>
{
    public async Task Handle(MyMessage message, HandlerContext context)
    {
        var attachments = context.Attachments();
        var bytes = await attachments.GetBytes("attachment1", context.CancellationToken);
        // use the byte array
    }
}

snippet source | anchor

Reading attachments for a specific message

All of the above examples have companion methods that are suffixed with ForMessage. These methods allow a handler or saga to read any attachments as long as the message id for that attachment is known. For example processing all attachments for a specific message could be done as follows

class HandlerProcessStreamsForMessage :
    IHandleMessages<MyMessage>
{
    public Task Handle(MyMessage message, HandlerContext context)
    {
        var attachments = context.Attachments();
        return attachments.ProcessStreamsForMessage(
            messageId: "theMessageId",
            action: async (stream, cancel) =>
            {
                // Use the attachment stream. in this example copy to a file
                await using var toCopyTo = File.Create($"{stream.Name}.txt");
                await stream.CopyToAsync(toCopyTo, cancel);
            },
            context.CancellationToken);
    }
}

snippet source | anchor

class HandlerProcessStreamsForMessage :
    IHandleMessages<MyMessage>
{
    public Task Handle(MyMessage message, HandlerContext context)
    {
        var attachments = context.Attachments();
        return attachments.ProcessStreamsForMessage(
            messageId: "theMessageId",
            action: async (stream, cancel) =>
            {
                // Use the attachment stream. in this example copy to a file
                await using var file = File.Create($"{stream.Name}.txt");
                await stream.CopyToAsync(file, cancel);
            },
            context.CancellationToken);
    }
}

snippet source | anchor

This can be helpful in a saga that is operating in a Scatter-Gather mode. So instead of storing all binaries inside the saga persister, the saga can instead store the message ids and then, at a latter point in time, access those attachments.

Unit Testing

The below examples also use the NServiceBus.Testing extension.

Testing outgoing attachments

public class Handler :
    IHandleMessages<MyMessage>
{
    public Task Handle(MyMessage message, HandlerContext context)
    {
        var options = new SendOptions();
        var attachments = options.Attachments();
        attachments.Add("theName", () => File.OpenRead("aFilePath"));
        return context.Send(new OtherMessage(), options);
    }
}

snippet source | anchor

public class Handler :
    IHandleMessages<MyMessage>
{
    public Task Handle(MyMessage message, HandlerContext context)
    {
        var options = new SendOptions();
        var attachments = options.Attachments();
        attachments.Add("theName", () => File.OpenRead("aFilePath"));
        return context.Send(new OtherMessage(), options);
    }
}

snippet source | anchor

[Fact]
public async Task TestOutgoingAttachments()
{
    //Arrange
    var context = new RecordingHandlerContext();
    var handler = new Handler();

    //Act
    await handler.Handle(new(), context);

    // Assert
    var sentMessage = context.Sent.Single();
    var attachments = sentMessage.Options.Attachments();
    var attachment = attachments.Items.Single();
    Assert.Contains("theName", attachment.Name);
    Assert.True(attachments.HasPendingAttachments);
}

snippet source | anchor

[Fact]
public async Task TestOutgoingAttachments()
{
    //Arrange
    var context = new RecordingHandlerContext();
    var handler = new Handler();

    //Act
    await handler.Handle(new(), context);

    // Assert
    var sentMessage = context.Sent.Single();
    var attachments = sentMessage.Options.Attachments();
    var attachment = attachments.Items.Single();
    Assert.Contains("theName", attachment.Name);
    Assert.True(attachments.HasPendingAttachments);
}

snippet source | anchor

Testing incoming attachments

Injecting a custom instance

To mock or verify incoming attachments is it necessary to inject a instance of IMessageAttachments into the current IMessageHandlerContext. This can be done using the MockAttachmentHelper.InjectAttachmentsInstance() extension method which exists in the NServiceBus.Attachments.Testing namespace.

var context = new RecordingHandlerContext();
var mockMessageAttachments = new MyMessageAttachments();
context.InjectAttachmentsInstance(mockMessageAttachments);

snippet source | anchor

var context = new RecordingHandlerContext();
var mockMessageAttachments = new MyMessageAttachments();
context.InjectAttachmentsInstance(mockMessageAttachments);

snippet source | anchor

The implementation of IMessageHandlerContext can be a custom coded mock or constructed using any of the popular mocking/assertion frameworks.

There is a default implementation of IMessageAttachments named MockMessageAttachments. This implementation stubs out all methods. All members are virtual so it can be used as simplified base class for custom mocks.

public class CustomMockMessageAttachments :
    MockMessageAttachments
{
    public override Task<AttachmentBytes> GetBytes(Cancel cancel = default)
    {
        GetBytesWasCalled = true;
        return Task.FromResult(new AttachmentBytes("name", [5]));
    }

    public bool GetBytesWasCalled { get; private set; }
}

snippet source | anchor

public class CustomMockMessageAttachments :
    MockMessageAttachments
{
    public override Task<AttachmentBytes> GetBytes(Cancel cancel = default)
    {
        GetBytesWasCalled = true;
        return Task.FromResult(new AttachmentBytes("name", [5]));
    }

    public bool GetBytesWasCalled { get; private set; }
}

snippet source | anchor

Putting these parts together allows a handler, using incoming attachments, to be tested.

public class Handler :
    IHandleMessages<MyMessage>
{
    public async Task Handle(MyMessage message, HandlerContext context)
    {
        var attachment = context.Attachments();
        var bytes = await attachment.GetBytes(context.CancellationToken);
    }
}

snippet source | anchor

public class Handler :
    IHandleMessages<MyMessage>
{
    public async Task Handle(MyMessage message, HandlerContext context)
    {
        var attachment = context.Attachments();
        var bytes = await attachment.GetBytes(context.CancellationToken);
    }
}

snippet source | anchor

[Fact]
public async Task TestIncomingAttachment()
{
    //Arrange
    var context = new RecordingHandlerContext();
    var handler = new Handler();
    var mockMessageAttachments = new CustomMockMessageAttachments();
    context.InjectAttachmentsInstance(mockMessageAttachments);

    //Act
    await handler.Handle(new(), context);

    //Assert
    Assert.True(mockMessageAttachments.GetBytesWasCalled);
}

snippet source | anchor

[Fact]
public async Task TestIncomingAttachment()
{
    //Arrange
    var context = new RecordingHandlerContext();
    var handler = new Handler();
    var mockMessageAttachments = new CustomMockMessageAttachments();
    context.InjectAttachmentsInstance(mockMessageAttachments);

    //Act
    await handler.Handle(new(), context);

    //Assert
    Assert.True(mockMessageAttachments.GetBytesWasCalled);
}

snippet source | anchor