Fix bubble party + Download queue + Allow pause user in syncshell + add visual feature + clean log info
This commit is contained in:
@@ -1,6 +1,7 @@
|
||||
using System.Net.Http.Headers;
|
||||
using System.Text;
|
||||
using System.Text.Json;
|
||||
using System.Text.Json.Serialization;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using MareSynchronos.WebAPI.SignalR;
|
||||
using MareSynchronos.Services.AutoDetect;
|
||||
@@ -65,15 +66,21 @@ public class DiscoveryApiClient
|
||||
}
|
||||
}
|
||||
|
||||
public async Task<bool> SendRequestAsync(string endpoint, string token, string? displayName, CancellationToken ct)
|
||||
public async Task<bool> SendRequestAsync(string endpoint, string? token, string? targetUid, string? displayName, CancellationToken ct)
|
||||
{
|
||||
try
|
||||
{
|
||||
if (string.IsNullOrEmpty(token) && string.IsNullOrEmpty(targetUid))
|
||||
{
|
||||
_logger.LogWarning("Discovery request aborted: no token or targetUid provided");
|
||||
return false;
|
||||
}
|
||||
|
||||
var jwt = await _tokenProvider.GetOrUpdateToken(ct).ConfigureAwait(false);
|
||||
if (string.IsNullOrEmpty(jwt)) return false;
|
||||
using var req = new HttpRequestMessage(HttpMethod.Post, endpoint);
|
||||
req.Headers.Authorization = new AuthenticationHeaderValue("Bearer", jwt);
|
||||
var body = JsonSerializer.Serialize(new { token, displayName });
|
||||
var body = JsonSerializer.Serialize(new RequestPayload(token, targetUid, displayName));
|
||||
req.Content = new StringContent(body, Encoding.UTF8, "application/json");
|
||||
var resp = await _httpClient.SendAsync(req, ct).ConfigureAwait(false);
|
||||
if (resp.StatusCode == System.Net.HttpStatusCode.Unauthorized)
|
||||
@@ -82,7 +89,7 @@ public class DiscoveryApiClient
|
||||
if (string.IsNullOrEmpty(jwt2)) return false;
|
||||
using var req2 = new HttpRequestMessage(HttpMethod.Post, endpoint);
|
||||
req2.Headers.Authorization = new AuthenticationHeaderValue("Bearer", jwt2);
|
||||
var body2 = JsonSerializer.Serialize(new { token, displayName });
|
||||
var body2 = JsonSerializer.Serialize(new RequestPayload(token, targetUid, displayName));
|
||||
req2.Content = new StringContent(body2, Encoding.UTF8, "application/json");
|
||||
resp = await _httpClient.SendAsync(req2, ct).ConfigureAwait(false);
|
||||
}
|
||||
@@ -102,6 +109,14 @@ public class DiscoveryApiClient
|
||||
}
|
||||
}
|
||||
|
||||
private sealed record RequestPayload(
|
||||
[property: JsonPropertyName("token"), JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)]
|
||||
string? Token,
|
||||
[property: JsonPropertyName("targetUid"), JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)]
|
||||
string? TargetUid,
|
||||
[property: JsonPropertyName("displayName"), JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)]
|
||||
string? DisplayName);
|
||||
|
||||
public async Task<bool> PublishAsync(string endpoint, IEnumerable<string> hashes, string? displayName, CancellationToken ct, bool allowRequests = true)
|
||||
{
|
||||
try
|
||||
|
||||
@@ -4,6 +4,7 @@ using MareSynchronos.API.Data;
|
||||
using MareSynchronos.API.Dto.Files;
|
||||
using MareSynchronos.API.Routes;
|
||||
using MareSynchronos.FileCache;
|
||||
using MareSynchronos.MareConfiguration;
|
||||
using MareSynchronos.PlayerData.Handlers;
|
||||
using MareSynchronos.Services.Mediator;
|
||||
using MareSynchronos.Utils;
|
||||
@@ -20,17 +21,22 @@ public partial class FileDownloadManager : DisposableMediatorSubscriberBase
|
||||
private readonly Dictionary<string, FileDownloadStatus> _downloadStatus;
|
||||
private readonly FileCompactor _fileCompactor;
|
||||
private readonly FileCacheManager _fileDbManager;
|
||||
private readonly MareConfigService _mareConfigService;
|
||||
private readonly FileTransferOrchestrator _orchestrator;
|
||||
private readonly List<ThrottledStream> _activeDownloadStreams;
|
||||
private readonly object _queueLock = new();
|
||||
private SemaphoreSlim? _downloadQueueSemaphore;
|
||||
private int _downloadQueueCapacity = -1;
|
||||
|
||||
public FileDownloadManager(ILogger<FileDownloadManager> logger, MareMediator mediator,
|
||||
FileTransferOrchestrator orchestrator,
|
||||
FileCacheManager fileCacheManager, FileCompactor fileCompactor) : base(logger, mediator)
|
||||
FileCacheManager fileCacheManager, FileCompactor fileCompactor, MareConfigService mareConfigService) : base(logger, mediator)
|
||||
{
|
||||
_downloadStatus = new Dictionary<string, FileDownloadStatus>(StringComparer.Ordinal);
|
||||
_orchestrator = orchestrator;
|
||||
_fileDbManager = fileCacheManager;
|
||||
_fileCompactor = fileCompactor;
|
||||
_mareConfigService = mareConfigService;
|
||||
_activeDownloadStreams = [];
|
||||
|
||||
Mediator.Subscribe<DownloadLimitChangedMessage>(this, (msg) =>
|
||||
@@ -59,6 +65,14 @@ public partial class FileDownloadManager : DisposableMediatorSubscriberBase
|
||||
|
||||
public async Task DownloadFiles(GameObjectHandler gameObject, List<FileReplacementData> fileReplacementDto, CancellationToken ct)
|
||||
{
|
||||
SemaphoreSlim? queueSemaphore = null;
|
||||
if (_mareConfigService.Current.EnableDownloadQueue)
|
||||
{
|
||||
queueSemaphore = GetQueueSemaphore();
|
||||
Logger.LogTrace("Queueing download for {name}. Currently queued: {queued}", gameObject.Name, queueSemaphore.CurrentCount);
|
||||
await queueSemaphore.WaitAsync(ct).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
Mediator.Publish(new HaltScanMessage(nameof(DownloadFiles)));
|
||||
try
|
||||
{
|
||||
@@ -70,6 +84,11 @@ public partial class FileDownloadManager : DisposableMediatorSubscriberBase
|
||||
}
|
||||
finally
|
||||
{
|
||||
if (queueSemaphore != null)
|
||||
{
|
||||
queueSemaphore.Release();
|
||||
}
|
||||
|
||||
Mediator.Publish(new DownloadFinishedMessage(gameObject));
|
||||
Mediator.Publish(new ResumeScanMessage(nameof(DownloadFiles)));
|
||||
}
|
||||
@@ -132,6 +151,22 @@ public partial class FileDownloadManager : DisposableMediatorSubscriberBase
|
||||
return (string.Join("", hashName), long.Parse(string.Join("", fileLength)));
|
||||
}
|
||||
|
||||
private SemaphoreSlim GetQueueSemaphore()
|
||||
{
|
||||
var desiredCapacity = Math.Clamp(_mareConfigService.Current.ParallelDownloads, 1, 10);
|
||||
|
||||
lock (_queueLock)
|
||||
{
|
||||
if (_downloadQueueSemaphore == null || _downloadQueueCapacity != desiredCapacity)
|
||||
{
|
||||
_downloadQueueSemaphore = new SemaphoreSlim(desiredCapacity, desiredCapacity);
|
||||
_downloadQueueCapacity = desiredCapacity;
|
||||
}
|
||||
|
||||
return _downloadQueueSemaphore;
|
||||
}
|
||||
}
|
||||
|
||||
private async Task DownloadAndMungeFileHttpClient(string downloadGroup, Guid requestId, List<DownloadFileTransfer> fileTransfer, string tempPath, IProgress<long> progress, CancellationToken ct)
|
||||
{
|
||||
Logger.LogDebug("GUID {requestId} on server {uri} for files {files}", requestId, fileTransfer[0].DownloadUri, string.Join(", ", fileTransfer.Select(c => c.Hash).ToList()));
|
||||
|
||||
Reference in New Issue
Block a user