Skip to content

Commit

Permalink
feat: rewrite the entire thing to dataflow
Browse files Browse the repository at this point in the history
  • Loading branch information
CumpsD committed Jan 31, 2024
1 parent 6be8e42 commit 646f49d
Show file tree
Hide file tree
Showing 15 changed files with 1,008 additions and 466 deletions.
485 changes: 30 additions & 455 deletions chainflip-insights/Bot.cs

Large diffs are not rendered by default.

7 changes: 7 additions & 0 deletions chainflip-insights/Configuration/BotConfiguration.cs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,13 @@ public string? SwapUrl
get; init;
}

[Required]
[NotNull]
public int? FeedingDelay
{
get; init;
}

[Required]
[NotNull]
public int? SwapInfoDelay
Expand Down
121 changes: 121 additions & 0 deletions chainflip-insights/Consumers/Discord/DiscordConsumer.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
namespace ChainflipInsights.Consumers.Discord
{
using System;
using System.Threading;
using System.Threading.Tasks.Dataflow;
using ChainflipInsights.Configuration;
using ChainflipInsights.Feeders.Swap;
using ChainflipInsights.Infrastructure.Pipelines;
using global::Discord;
using global::Discord.WebSocket;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;

public class DiscordConsumer
{
private readonly ILogger<DiscordConsumer> _logger;
private readonly BotConfiguration _configuration;
private readonly DiscordSocketClient _discordClient;

public DiscordConsumer(
ILogger<DiscordConsumer> logger,
IOptions<BotConfiguration> options,
DiscordSocketClient discordClient)
{
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_configuration = options.Value ?? throw new ArgumentNullException(nameof(options));
_discordClient = discordClient ?? throw new ArgumentNullException(nameof(discordClient));
}

public ITargetBlock<SwapInfo> Build(
CancellationToken ct)
{
var announcer = BuildAnnouncer(ct);
return new EncapsulatingTarget<SwapInfo, SwapInfo>(announcer, announcer);
}

private ActionBlock<SwapInfo> BuildAnnouncer(
CancellationToken ct)
{
var logging = new ActionBlock<SwapInfo>(
swap =>
{
if (!_configuration.EnableDiscord.Value)
return;
VerifyConnection();
var text =
$"{swap.Emoji} Swapped " +
$"**{swap.DepositAmountFormatted} {swap.SourceAsset}** (*${swap.DepositValueUsdFormatted}*) → " +
$"**{swap.EgressAmountFormatted} {swap.DestinationAsset}** (*${swap.EgressValueUsdFormatted}*) " +
$"// **[view swap on explorer]({_configuration.ExplorerUrl}{swap.Id})**";
if (_discordClient.ConnectionState != ConnectionState.Connected)
return;
var infoChannel = (ITextChannel)_discordClient.GetChannel(_configuration.DiscordSwapInfoChannelId.Value);
try
{
infoChannel
.SendMessageAsync(
text,
flags: MessageFlags.SuppressEmbeds)
.GetAwaiter()
.GetResult();
}
catch (Exception e)
{
_logger.LogError(e, "Discord meh.");
}
},
new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 1,
CancellationToken = ct
});

logging.Completion.ContinueWith(
task =>
{
_logger.LogDebug(
"Discord Logging completed, {Status}",
task.Status);
if (_discordClient.ConnectionState == ConnectionState.Disconnected)
return;
_logger.LogDebug("Disconnecting Discord client");
_discordClient
.LogoutAsync()
.GetAwaiter()
.GetResult();
_discordClient
.StopAsync()
.GetAwaiter()
.GetResult();
},
ct);

return logging;
}

private void VerifyConnection()
{
_discordClient
.LoginAsync(
TokenType.Bot,
_configuration.DiscordToken)
.GetAwaiter()
.GetResult();

_discordClient
.StartAsync()
.GetAwaiter()
.GetResult();
}
}
}
92 changes: 92 additions & 0 deletions chainflip-insights/Consumers/Telegram/TelegramConsumer.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
namespace ChainflipInsights.Consumers.Telegram
{
using System;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
using ChainflipInsights.Configuration;
using ChainflipInsights.Feeders.Swap;
using ChainflipInsights.Infrastructure.Pipelines;
using global::Telegram.Bot;
using global::Telegram.Bot.Types;
using global::Telegram.Bot.Types.Enums;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;

public class TelegramConsumer
{
private readonly ILogger<TelegramConsumer> _logger;
private readonly BotConfiguration _configuration;
private readonly TelegramBotClient _telegramClient;

public TelegramConsumer(
ILogger<TelegramConsumer> logger,
IOptions<BotConfiguration> options,
TelegramBotClient telegramClient)
{
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_configuration = options.Value ?? throw new ArgumentNullException(nameof(options));
_telegramClient = telegramClient ?? throw new ArgumentNullException(nameof(telegramClient));
}

public ITargetBlock<SwapInfo> Build(
CancellationToken ct)
{
var announcer = BuildAnnouncer(ct);
return new EncapsulatingTarget<SwapInfo, SwapInfo>(announcer, announcer);
}

private ActionBlock<SwapInfo> BuildAnnouncer(
CancellationToken ct)
{
var logging = new ActionBlock<SwapInfo>(
swap =>
{
if (!_configuration.EnableTelegram.Value)
return;
try
{
var text =
$"{swap.Emoji} Swapped " +
$"**{swap.DepositAmountFormatted} {swap.SourceAsset}** (*${swap.DepositValueUsdFormatted}*) → " +
$"**{swap.EgressAmountFormatted} {swap.DestinationAsset}** (*${swap.EgressValueUsdFormatted}*) " +
$"// **[view swap on explorer]({_configuration.ExplorerUrl}{swap.Id})**";
_telegramClient
.SendTextMessageAsync(
new ChatId(_configuration.TelegramSwapInfoChannelId.Value),
text,
parseMode: ParseMode.Markdown,
disableNotification: true,
allowSendingWithoutReply: true,
cancellationToken: ct)
.GetAwaiter()
.GetResult();
}
catch (Exception e)
{
_logger.LogError(e, "Telegram meh.");
}
Task
.Delay(1500, ct)
.GetAwaiter()
.GetResult();
},
new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 1,
CancellationToken = ct
});

logging.Completion.ContinueWith(
task => _logger.LogDebug(
"Telegram Logging completed, {Status}",
task.Status),
ct);

return logging;
}
}
}
108 changes: 108 additions & 0 deletions chainflip-insights/Consumers/Twitter/TwitterConsumer.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
namespace ChainflipInsights.Consumers.Twitter
{
using System;
using System.Net.Http.Headers;
using System.Net.Http.Json;
using System.Net.Mime;
using System.Text.Json.Serialization;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
using ChainflipInsights.Configuration;
using ChainflipInsights.Feeders.Swap;
using ChainflipInsights.Infrastructure.Pipelines;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Tweetinvi;

public class TwitterConsumer
{
private readonly ILogger<TwitterConsumer> _logger;
private readonly BotConfiguration _configuration;
private readonly TwitterClient _twitterClient;

public TwitterConsumer(
ILogger<TwitterConsumer> logger,
IOptions<BotConfiguration> options,
TwitterClient twitterClient)
{
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_configuration = options.Value ?? throw new ArgumentNullException(nameof(options));
_twitterClient = twitterClient ?? throw new ArgumentNullException(nameof(twitterClient));
}

public ITargetBlock<SwapInfo> Build(
CancellationToken ct)
{
var announcer = BuildAnnouncer(ct);
return new EncapsulatingTarget<SwapInfo, SwapInfo>(announcer, announcer);
}

private ActionBlock<SwapInfo> BuildAnnouncer(
CancellationToken ct)
{
var logging = new ActionBlock<SwapInfo>(
swap =>
{
if (!_configuration.EnableTwitter.Value)
return;
try
{
var text =
$"{swap.Emoji} Swapped {_configuration.ExplorerUrl}{swap.Id}\n" +
$"➡️ {swap.DepositAmountFormatted} ${swap.SourceAsset} (${swap.DepositValueUsdFormatted})\n" +
$"⬅️ {swap.EgressAmountFormatted} ${swap.DestinationAsset} (${swap.EgressValueUsdFormatted})";
_twitterClient.Execute
.AdvanceRequestAsync(x =>
{
x.Query.Url = "https://api.twitter.com/2/tweets";
x.Query.HttpMethod = Tweetinvi.Models.HttpMethod.POST;
x.Query.HttpContent = JsonContent.Create(
new TweetV2PostRequest { Text = text },
mediaType: new MediaTypeHeaderValue(MediaTypeNames.Application.Json));
})
.GetAwaiter()
.GetResult();
}
catch (Exception e)
{
_logger.LogError(e, "Twitter meh.");
}
Task
.Delay(1500, ct)
.GetAwaiter()
.GetResult();
},
new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 1,
CancellationToken = ct
});

logging.Completion.ContinueWith(
task => _logger.LogDebug(
"Twitter Logging completed, {Status}",
task.Status),
ct);

return logging;
}
}

/// <summary>
/// There are a lot more fields according to:
/// https://developer.twitter.com/en/docs/twitter-api/tweets/manage-tweets/api-reference/post-tweets
/// but these are the ones we care about for our use case.
/// </summary>
public class TweetV2PostRequest
{
/// <summary>
/// The text of the tweet to post.
/// </summary>
[JsonPropertyName("text")]
public string Text { get; set; }
}
}
Loading

0 comments on commit 646f49d

Please sign in to comment.