10

使用Blazor WASM实现可取消的多文件带校验并发分片上传 - coredx

 11 months ago
source link: https://www.cnblogs.com/coredx/p/17746162.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

使用Blazor WASM实现可取消的多文件带校验并发分片上传

上传大文件时,原始HTTP文件上传功能可能会影响使用体验,此时使用分片上传功能可以有效避免原始上传的弊端。由于分片上传不是HTTP标准的一部分,所以只能自行开发相互配合的服务端和客户端。文件分片上传在许多情况时都拥有很多好处,除非已知需要上传的文件一定非常小。分片上传可以对上传的文件进行快速分片校验,避免大文件上传时长时间等待校验,当然完整校验可以在秒传时使用,有这种需求的情况就只能老实等待校验了。

Blazr WASM提供了在 .NET环境中使用浏览器功能的能力,充分利用C#和 .NET能够大幅简化分片上传功能的开发。本次示例使用HTTP标准上传作为分片上传的底层基础,并提供分片校验功能保障上传数据的完整性。

有关新书的更多介绍欢迎查看《C#与.NET6 开发从入门到实践》上市,作者亲自来打广告了!

image

本示例的Blazor代码位于默认ASP.NET Core托管的Blazor WASM应用模板的Index页面。

在Shared项目添加公共数据模型

/// <summary>/// 文件分片上传输入模型/// </summary>public class FileChunkUploadInput{ /// <summary> /// 上传任务代码 /// </summary> public string? UploadTaskCode { get; set; } /// <summary> /// 上传请求类型 /// </summary> public string UploadType { get; set; } = null!; /// <summary> /// 文件名 /// </summary> public string FileName { get; set; } = null!; /// <summary> /// 文件大小 /// </summary> public long? FileSize { get; set; } /// <summary> /// 支持的Hash算法,优选算法请靠前 /// </summary> public List<string>? AllowedHashAlgorithm { get; set; } /// <summary> /// 使用的Hash算法 /// </summary> public string? HashAlgorithm { get; set; } /// <summary> /// Hash值 /// </summary> public string? HashValue { get; set; } /// <summary> /// 文件分片数量 /// </summary> public int FileChunkCount { get; set; } /// <summary> /// 文件片段大小 /// </summary> public int? FileChunkSize { get; set; } /// <summary> /// 文件片段偏移量(相对于整个文件) /// </summary> public long? FileChunkOffset { get; set; } /// <summary> /// 文件片段索引 /// </summary> public int? FileChunkIndex { get; set; } /// <summary> /// 取消上传的原因 /// </summary> public string? CancelReason { get; set; }} /// <summary>/// 文件分片上传开始结果/// </summary>public class FileChunkUploadStartReault{ /// <summary> /// 上传任务代码 /// </summary> public string UploadTaskCode { get; set; } = null!; /// <summary> /// 选中的Hash算法 /// </summary> public string SelectedHashAlgorithm { get; set; } = null!;} /// <summary>/// Hash助手/// </summary>public static class HashHelper{ /// <summary> /// 把Hash的字节数组转换为16进制字符串表示 /// </summary> /// <param name="bytes">原始Hash值</param> /// <returns>Hash值的16进制文本表示(大写)</returns> public static string ToHexString(this byte[] bytes) { StringBuilder sb = new(bytes.Length * 2); foreach (var @byte in bytes) { sb.Append(@byte.ToString("X2")); } return sb.ToString(); }}

服务端控制器

[ApiController][Route("[controller]")]public class UploadController : ControllerBase{ /// <summary> /// 支持的Hash算法,优选算法请靠前 /// </summary> private static string[] supportedHashAlgorithm = new[] { "MD5", "SHA1", "SHA256" }; /// <summary> /// 文件写入锁的线程安全字典,每个上传任务对应一把锁 /// </summary> private static readonly ConcurrentDictionary<string, AsyncLock> fileWriteLockerDict = new(); private readonly ILogger<UploadController> _logger; private readonly IWebHostEnvironment _env; public UploadController(ILogger<UploadController> logger, IWebHostEnvironment env) { _logger = logger; _env = env; } /// <summary> /// 分片上传动作 /// </summary> /// <param name="input">上传表单</param> /// <param name="fileChunkData">文件片段数据</param> /// <param name="requestAborted">请求取消令牌</param> /// <returns>片段上传结果</returns> [HttpPost, RequestSizeLimit(1024 * 1024 * 11)] [ProducesResponseType(StatusCodes.Status200OK)] [ProducesResponseType(StatusCodes.Status400BadRequest)] [ProducesDefaultResponseType] public async Task<IActionResult> Upload( [FromForm]FileChunkUploadInput input, [FromForm]IFormFile? fileChunkData, CancellationToken requestAborted) { switch (input.UploadType) { // 请求开始一个新的上传任务,协商上传参数 case "startUpload": { //var trustedFileNameForDisplay = // WebUtility.HtmlEncode(fileChunkData?.FileName ?? input.FileName); // 选择双方都支持的优选Hash算法 var selectedHashAlgorithm = supportedHashAlgorithm .Intersect(input.AllowedHashAlgorithm ?? Enumerable.Empty<string>()) .FirstOrDefault(); // 验证必要的表单数据 if (selectedHashAlgorithm is null or "") { ModelState.AddModelError<FileChunkUploadInput>(x => x.AllowedHashAlgorithm, "can not select hash algorithm"); } if (input.FileSize is null) { ModelState.AddModelError<FileChunkUploadInput>(x => x.FileSize, "must have value for start、upload and complete"); } if (ModelState.ErrorCount > 0) { return ValidationProblem(ModelState); } // 使用随机文件名提高安全性,并把文件名作为任务代码使用 var trustedFileNameForFileStorage = Path.GetRandomFileName(); var savePath = Path.Combine( _env.ContentRootPath, _env.EnvironmentName, "unsafe_uploads", trustedFileNameForFileStorage); var savePathWithFile = Path.Combine( savePath, $"{input.FileName}.tmp"); if (!Directory.Exists(savePath)) { Directory.CreateDirectory(savePath); } // 根据表单创建对应大小的文件 await using (var fs = new FileStream(savePathWithFile, FileMode.Create)) { fs.SetLength(input.FileSize!.Value); await fs.FlushAsync(); } // 设置锁 fileWriteLockerDict.TryAdd(trustedFileNameForFileStorage, new()); // 返回协商结果 return Ok(new FileChunkUploadStartReault { UploadTaskCode = trustedFileNameForFileStorage, SelectedHashAlgorithm = selectedHashAlgorithm! }); } // 上传文件片段 case "uploadChunk": // 验证表单 if (!fileWriteLockerDict.TryGetValue(input.UploadTaskCode!, out var _)) { ModelState.AddModelError<FileChunkUploadInput>(x => x.UploadTaskCode, $"file upload task with code {input.UploadTaskCode} is not exists"); return ValidationProblem(ModelState); } // 使用内存池缓冲数据,注意使用using释放内存 using (var pooledMemory = MemoryPool<byte>.Shared.Rent((int)fileChunkData!.Length)) { // 使用切片语法获取精准大小的内存缓冲区装载上传的数据 var buffer = pooledMemory.Memory[..(int)fileChunkData!.Length]; var readBytes = await fileChunkData.OpenReadStream().ReadAsync(buffer, requestAborted); var readBuffer = buffer[..readBytes]; Debug.Assert(readBytes == fileChunkData!.Length); // 校验Hash var hash = input.HashAlgorithm switch { "SHA1" => SHA1.HashData(readBuffer.Span), "SHA256" => SHA256.HashData(readBuffer.Span), "MD5" => MD5.HashData(readBuffer.Span), _ => Array.Empty<byte>() }; if (hash.ToHexString() != input.HashValue) { ModelState.AddModelError<FileChunkUploadInput>(x => x.HashValue, "hash does not match"); return ValidationProblem(ModelState); } var savePath = Path.Combine( _env.ContentRootPath, _env.EnvironmentName, "unsafe_uploads", input.UploadTaskCode!); var savePathWithFile = Path.Combine( savePath, $"{input.FileName}.tmp"); // 使用锁写入数据,文件流不支持写共享,必须串行化 if(fileWriteLockerDict.TryGetValue(input.UploadTaskCode!, out var locker)) { using (await locker.LockAsync()) { await using (var fs = new FileStream(savePathWithFile, FileMode.Open, FileAccess.Write)) { // 定位文件流 fs.Seek(input.FileChunkOffset!.Value, SeekOrigin.Begin); await fs.WriteAsync(readBuffer, requestAborted); await fs.FlushAsync(); } } } } return Ok(); // 取消上传 case "cancelUpload": // 验证表单 if (!fileWriteLockerDict.TryGetValue(input.UploadTaskCode!, out var _)) { ModelState.AddModelError<FileChunkUploadInput>(x => x.UploadTaskCode, $"file upload task with code {input.UploadTaskCode} is not exists"); return ValidationProblem(ModelState); } { var deletePath = Path.Combine( _env.ContentRootPath, _env.EnvironmentName, "unsafe_uploads", input.UploadTaskCode!); // 删除文件,清除锁 if (fileWriteLockerDict.TryGetValue(input.UploadTaskCode!, out var locker)) { using (await locker.LockAsync()) { if (Directory.Exists(deletePath)) { var dir = new DirectoryInfo(deletePath); dir.Delete(true); } fileWriteLockerDict.TryRemove(input.UploadTaskCode!, out _); } } } return Ok(); // 完成上传 case "completeUpload": // 验证表单 if (!fileWriteLockerDict.TryGetValue(input.UploadTaskCode!, out var _)) { ModelState.AddModelError<FileChunkUploadInput>(x => x.UploadTaskCode, $"file upload task with code {input.UploadTaskCode} is not exists"); return ValidationProblem(ModelState); } { var savePath = Path.Combine( _env.ContentRootPath, _env.EnvironmentName, "unsafe_uploads", input.UploadTaskCode!); // 去除文件的临时扩展名,清除锁 var savePathWithFile = Path.Combine(savePath, $"{input.FileName}.tmp"); var fi = new FileInfo(savePathWithFile); fi.MoveTo(Path.Combine(savePath, input.FileName)); fileWriteLockerDict.TryRemove(input.UploadTaskCode!, out _); } return Ok(); default: return BadRequest(); } }}

服务端使用三段式上传模式,开始上传,上传数据,完成(取消)上传。开始上传负责协商Hash算法和分配任务代码;上传数据负责具体的传输,并通过表单提供附加信息方便服务端操作。完成上传负责善后和资源清理。其中文件写入的异步锁使用Nito.AsyncEx代替不支持在异步中使用的lock语句。

页面代码(Index.razor),在结尾追加

<p>支持随时取消的多文件并行分片上传,示例同时上传2个文件,每个文件同时上传2个分片,合计同时上传4个分片</p><InputFile OnChange="UploadFile" multiple></InputFile><button @onclick="async (MouseEventArgs e) => uploadCancelSource?.Cancel()">取消上传</button> @code{ [Inject] private HttpClient _http { get; init; } = null!; [Inject] private ILogger<Index> _logger { get; init; } = null!; private CancellationTokenSource? uploadCancelSource; /// <summary> /// 上传文件 /// </summary> /// <param name="args">上传文件的事件参数</param> /// <returns></returns> private async Task UploadFile(InputFileChangeEventArgs args) { // 设置文件并发选项 var parallelCts = new CancellationTokenSource(); uploadCancelSource = parallelCts; var parallelOption = new ParallelOptions { MaxDegreeOfParallelism = 2, CancellationToken = parallelCts.Token }; // 并发上传所有文件 await Parallel.ForEachAsync( args.GetMultipleFiles(int.MaxValue), parallelOption, async (file, cancellation) => { // 这里的取消令牌是并发方法创建的,和并发选项里的令牌不是一个 if (cancellation.IsCancellationRequested) { parallelCts.Cancel(); return; } // 使用链接令牌确保外部取消能传递到内部 var chunkUploadResult = await UploadChunkedFile( file, CancellationTokenSource.CreateLinkedTokenSource( parallelCts.Token, cancellation ).Token ); // 如果上传不成功则取消后续上传 if (chunkUploadResult != FileUploadResult.Success) { parallelCts.Cancel(); return; } } ); } /// <summary> /// 分片上传文件 /// </summary> /// <param name="file">要上传的文件</param> /// <param name="cancellation">取消令牌</param> /// <returns>上传结果</returns> private async Task<FileUploadResult> UploadChunkedFile(IBrowserFile file, CancellationToken cancellation = default) { if (cancellation.IsCancellationRequested) return FileUploadResult.Canceled; _logger.LogInformation("开始上传文件:{0}", file.Name); // 计算分片大小,文件小于10MB分片1MB,大于100MB分片10MB,在其间则使用不超过10片时的所需大小 var coefficient = file.Size switch { <= 1024 * 1024 * 10 => 1, > 1024 * 1024 * 10 and <= 1024 * 1024 *100 => (int)Math.Ceiling(file.Size / (1024.0 * 1024) / 10), _ => 10 }; // 初始化分片参数,准备字符串格式的数据供表单使用 var bufferSize = 1024 * 1024 * coefficient; // MB var stringBufferSize = bufferSize.ToString(); var chunkCount = (int)Math.Ceiling(file.Size / (double)bufferSize); var stringChunkCount = chunkCount.ToString(); var stringFileSize = file.Size.ToString(); // 发起分片上传,协商Hash算法,获取任务代码 var uploadStartContent = new List<KeyValuePair<string, string>> { new("uploadType", "startUpload"), new("fileName", file.Name), new("fileSize", stringFileSize), new("allowedHashAlgorithm", "SHA1"), new("allowedHashAlgorithm", "SHA256"), new("fileChunkCount", stringChunkCount), new("fileChunkSize", stringBufferSize), }; var uploadStartForm = new FormUrlEncodedContent(uploadStartContent); HttpResponseMessage? uploadStartResponse = null; try { uploadStartResponse = await _http.PostAsync("/upload", uploadStartForm, cancellation); } catch(TaskCanceledException e) { _logger.LogWarning(e, "外部取消上传,已停止文件:{0} 的上传", file.Name); return FileUploadResult.Canceled; } catch(Exception e) { _logger.LogError(e, "文件:{0} 的上传参数协商失败", file.Name); return FileUploadResult.Fail; } // 如果服务器响应失败,结束上传 if (uploadStartResponse?.IsSuccessStatusCode is null or false) { _logger.LogError("文件:{0} 的上传参数协商失败", file.Name); return FileUploadResult.Fail; } // 解析协商的参数 var uploadStartReault = await uploadStartResponse.Content.ReadFromJsonAsync<FileChunkUploadStartReault>(); var uploadTaskCode = uploadStartReault!.UploadTaskCode; var selectedHashAlgorithm = uploadStartReault!.SelectedHashAlgorithm; _logger.LogInformation("文件:{0} 的上传参数协商成功", file.Name); // 设置分片并发选项 var parallelOption = new ParallelOptions { MaxDegreeOfParallelism = 2, }; var fileUploadCancelSource = new CancellationTokenSource(); var sliceEnumeratorCancelSource = CancellationTokenSource .CreateLinkedTokenSource( cancellation, fileUploadCancelSource.Token ); // 各个分片的上传结果 var sliceUploadResults = new FileUploadResult?[chunkCount]; // 并发上传各个分片,并发循环本身不能用并发选项的取消令牌取消,可能会导致内存泄漏,应该通过切片循环的取消使并发循环因没有可用元素自然结束 await Parallel.ForEachAsync( SliceFileAsync( file, bufferSize, sliceEnumeratorCancelSource.Token ), parallelOption, async (fileSlice, sliceUploadCancel) => { // 解构参数 var (memory, sliceIndex, readBytes, fileOffset) = fileSlice; // 使用using确保结束后把租用的内存归还给内存池 using (memory) { var stringSliceIndex = sliceIndex.ToString(); // 主动取消上传,发送取消请求,通知服务端清理资源 if (sliceUploadCancel.IsCancellationRequested) { _logger.LogWarning("外部取消上传,已停止文件:{0} 的上传", file.Name); fileUploadCancelSource.Cancel(); sliceUploadResults[sliceIndex] = FileUploadResult.Canceled; var uploadCancelContent = new Dictionary<string, string>() { {"uploadType", "cancelUpload"}, {"uploadTaskCode", uploadTaskCode!}, {"fileName", file.Name}, {"hashAlgorithm", selectedHashAlgorithm}, {"fileChunkCount", stringChunkCount}, {"fileChunkIndex", stringSliceIndex}, {"cancelReason", "调用方要求取消上传。"}, }; var uploadCancelForm = new FormUrlEncodedContent(uploadCancelContent); var uploadCancelResponse = await _http.PostAsync("/upload", uploadCancelForm); return; } // 当前上传分片索引应当小于预计的分片数 Debug.Assert(sliceIndex < chunkCount); // 获取准确大小的缓冲区,从内存池租用时得到的容量可能大于申请的大小,使用C#的新集合切片语法 var readBuffer = memory.Memory[..readBytes]; var sw = Stopwatch.StartNew(); // 根据协商的算法计算Hash,wasm环境不支持MD5和全部非对称加密算法 var hash = selectedHashAlgorithm switch { "SHA1" => SHA1.HashData(readBuffer.Span), "SHA256" => SHA256.HashData(readBuffer.Span), _ => Array.Empty<byte>() }; sw.Stop(); _logger.LogInformation("文件:{0} 的片段 {1}({2} Bytes) 计算Hash用时 {3}", file.Name, sliceIndex, readBytes, sw.Elapsed); var stringReadBytes = readBytes.ToString(); var stringFileOffset = fileOffset.ToString(); // 上传当前分片 MultipartFormDataContent uploadFileForm = new(); uploadFileForm.Add(new StringContent(uploadTaskCode!), "uploadTaskCode"); uploadFileForm.Add(new StringContent("uploadChunk"), "uploadType"); uploadFileForm.Add(new StringContent(file.Name), "fileName"); uploadFileForm.Add(new StringContent(stringFileSize), "fileSize"); uploadFileForm.Add(new StringContent(selectedHashAlgorithm!), "hashAlgorithm"); uploadFileForm.Add(new StringContent(hash.ToHexString()), "hashValue"); uploadFileForm.Add(new StringContent(stringChunkCount), "fileChunkCount"); uploadFileForm.Add(new StringContent(stringReadBytes), "fileChunkSize"); uploadFileForm.Add(new StringContent(stringFileOffset), "fileChunkOffset"); uploadFileForm.Add(new StringContent(stringSliceIndex), "fileChunkIndex"); // 如果是未知的文件类型,设置为普通二进制流的MIME类型 var fileChunk = new ReadOnlyMemoryContent(readBuffer); fileChunk.Headers.ContentType = new MediaTypeHeaderValue(string.IsNullOrEmpty(file.ContentType) ? "application/octet-stream" : file.ContentType); uploadFileForm.Add(fileChunk, "fileChunkData", file.Name); HttpResponseMessage? uploadResponse = null; try { var uploadTaskCancel = CancellationTokenSource .CreateLinkedTokenSource( sliceUploadCancel, sliceEnumeratorCancelSource.Token ); _logger.LogInformation("文件:{0} 的片段 {1}({2} Bytes) 开始上传", file.Name, sliceIndex, readBytes); sw.Restart(); uploadResponse = await _http.PostAsync("/upload", uploadFileForm, uploadTaskCancel.Token); } catch (TaskCanceledException e) { _logger.LogWarning(e, "外部取消上传,已停止文件:{0} 的上传", file.Name); fileUploadCancelSource.Cancel(); sliceUploadResults[sliceIndex] = FileUploadResult.Canceled; var uploadCancelContent = new Dictionary<string, string>() { {"uploadType", "cancelUpload"}, {"uploadTaskCode", uploadTaskCode!}, {"fileName", file.Name}, {"hashAlgorithm", selectedHashAlgorithm}, {"fileChunkCount", stringChunkCount}, {"fileChunkIndex", stringSliceIndex}, {"cancelReason", "调用方要求取消上传。"}, }; var uploadCancelForm = new FormUrlEncodedContent(uploadCancelContent); var uploadCancelResponse = await _http.PostAsync("/upload", uploadCancelForm); return; } catch (Exception e) { _logger.LogError(e, "上传发生错误,已停止文件:{0} 的上传", file.Name); fileUploadCancelSource.Cancel(); sliceUploadResults[sliceIndex] = FileUploadResult.Fail; var uploadCancelContent = new Dictionary<string, string>() { {"uploadType", "cancelUpload"}, {"uploadTaskCode", uploadTaskCode!}, {"fileName", file.Name}, {"hashAlgorithm", selectedHashAlgorithm}, {"fileChunkCount", stringChunkCount}, {"fileChunkSize", stringReadBytes}, {"fileChunkOffset", stringFileOffset}, {"fileChunkIndex", stringSliceIndex}, {"cancelReason", "上传过程中发生错误。"}, }; var uploadCancelForm = new FormUrlEncodedContent(uploadCancelContent); var uploadCancelResponse = await _http.PostAsync("/upload", uploadCancelForm); return; } finally { sw.Stop(); } // 上传发生错误,发送取消请求,通知服务端清理资源 if (uploadResponse?.IsSuccessStatusCode is null or false) { _logger.LogError("上传发生错误,已停止文件:{0} 的上传", file.Name); fileUploadCancelSource.Cancel(); sliceUploadResults[sliceIndex] = FileUploadResult.Fail; var uploadCancelContent = new Dictionary<string, string>() { {"uploadType", "cancelUpload"}, {"uploadTaskCode", uploadTaskCode!}, {"fileName", file.Name}, {"hashAlgorithm", selectedHashAlgorithm}, {"fileChunkCount", stringChunkCount}, {"fileChunkSize", stringReadBytes}, {"fileChunkOffset", stringFileOffset}, {"fileChunkIndex", stringSliceIndex}, {"cancelReason", "上传过程中发生错误。"}, }; var uploadCancelForm = new FormUrlEncodedContent(uploadCancelContent); var uploadCancelResponse = await _http.PostAsync("/upload", uploadCancelForm); return; } _logger.LogInformation("文件:{0} 的片段 {1}({2} Bytes) 上传成功,用时 {3}", file.Name, sliceIndex, readBytes, sw.Elapsed); sliceUploadResults[sliceIndex] = FileUploadResult.Success; } } ); // 如果所有分片都上传成功,则发送完成请求完成上传 if (sliceUploadResults.All(success => success is FileUploadResult.Success)) { var uploadCompleteContent = new Dictionary<string, string>() { {"uploadType", "completeUpload"}, {"uploadTaskCode", uploadTaskCode!}, {"fileName", file.Name}, {"fileSize", stringFileSize}, {"hashAlgorithm", selectedHashAlgorithm}, {"fileChunkCount", stringChunkCount}, {"fileChunkSize", stringBufferSize}, }; var uploadCompleteForm = new FormUrlEncodedContent(uploadCompleteContent); var uploadCompleteResponse = await _http.PostAsync("/upload", uploadCompleteForm); if (uploadCompleteResponse.IsSuccessStatusCode) { _logger.LogInformation("文件:{0} 上传成功,共 {1} 个片段", file.Name, chunkCount); return FileUploadResult.Success; } else { _logger.LogError("上传发生错误,已停止文件:{0} 的上传", file.Name); var uploadCancelContent = new Dictionary<string, string>() { {"uploadType", "cancelUpload"}, {"uploadTaskCode", uploadTaskCode!}, {"fileName", file.Name}, {"hashAlgorithm", selectedHashAlgorithm}, {"fileChunkCount", stringChunkCount}, {"cancelReason", "上传过程中发生错误。"}, }; var uploadCancelForm = new FormUrlEncodedContent(uploadCancelContent); var uploadCancelResponse = await _http.PostAsync("/upload", uploadCancelForm); return FileUploadResult.Fail; } } else if (sliceUploadResults.Any(success => success is FileUploadResult.Fail)) { return FileUploadResult.Fail; } else { return FileUploadResult.Canceled; } } /// <summary> /// 异步切分要上传的文件 /// <br/>如果想中途结束切分,不要在调用此方法的foreach块中使用break,请使用取消令牌,否则会出现内存泄漏 /// </summary> /// <param name="file">要分片的文件</param> /// <param name="sliceSize">分片大小</param> /// <param name="cancellation">取消令牌</param> /// <returns>已切分的文件片段数据,用完切记释放其中的内存缓冲</returns> private static async IAsyncEnumerable<(IMemoryOwner<byte> memory, int sliceIndex, int readBytes, long fileOffset)> SliceFileAsync( IBrowserFile file, int sliceSize, [EnumeratorCancellation] CancellationToken cancellation = default) { if (cancellation.IsCancellationRequested) yield break; int fileSliceIndex; long fileOffset; IMemoryOwner<byte> memory; await using var fileStream = file.OpenReadStream(long.MaxValue); for (fileSliceIndex = 0, fileOffset = 0, memory = MemoryPool<byte>.Shared.Rent(sliceSize); (await fileStream.ReadAsync(memory.Memory[..sliceSize], cancellation)) is int readBytes and > 0; fileSliceIndex++, fileOffset += readBytes, memory = MemoryPool<byte>.Shared.Rent(sliceSize) ) { if(cancellation.IsCancellationRequested) { // 如果取消切分,缓冲不会返回到外部,只能在内部释放 memory.Dispose(); yield break; } yield return (memory, fileSliceIndex, readBytes, fileOffset); } // 切分结束后会多出一个没用的缓冲,只能在内部释放 memory.Dispose(); } /// <summary> /// 上传结果 /// </summary> public enum FileUploadResult { /// <summary> /// 失败 /// </summary> Fail = -2, /// <summary> /// 取消 /// </summary> Canceled = -1, /// <summary> /// 没有结果,未知结果 /// </summary> None = 0, /// <summary> /// 成功 /// </summary> Success = 1 }}

示例使用Parallel.ForEachAsync方法并行启动多个文件和每个文件的多个片段的上传,并发量由方法的参数控制。UploadChunkedFile方法负责单个文件的上传,其中的IBrowserFile类型是.NET 6新增的文件选择框选中项的包装,可以使用其中的OpenReadStream方法流式读取文件数据,确保大文件上传不会在内存中缓冲所有数据导致内存占用问题。

UploadChunkedFile方法内部使用自适应分片大小算法,规则为片段最小1MB,最大10MB,尽可能平均分为10份。得出片段大小后向服务端请求开始上传文件,服务端成功返回后开始文件切分、校验和上传。

SliceFileAsync负责切分文件并流式返回每个片段,切分方法是惰性的,所以不用担心占用大量内存,但是这个方法只能使用取消令牌中断切分,如果在调用该方法的await foreach块中使用break中断会产生内存泄漏。切分完成后会返回包含片段数据的内存缓冲和其他附加信息。OpenReadStream需要使用参数控制允许读取的最大字节数(默认512KB),因为这里是分片上传,直接设置为long.MaxValue即可。for循环头使用逗号表达式定义多个循环操作,使循环体的代码清晰简洁。

UploadChunkedFile方法使用Parallel.ForEachAsync并行启动多个片段的校验和上传,WASM中不支持MD5和所有非对称加密算法,需要注意。完成文件的并行上传或发生错误后会检查所有片段的上传情况,如果所有片段都上传成功,就发送完成上传请求通知服务端收尾善后,否则删除临时文件。

这应该是一个比较清晰易懂的分片上传示例。示例使用Blazor 和C#以非常流畅的异步代码实现了并发分片上传。但是本示例依然有许多可优化的点,例如实现断点续传,服务端如果没有收到结束请求时的兜底处理等,这些就留给朋友们思考了。

又是很久没有写文章了,一直没有找到什么好选题,难得找到一个,经过将近1周的研究开发终于搞定了。

读者交流QQ群:540719365

image

欢迎读者和广大朋友一起交流,如发现本书错误也欢迎通过博客园、QQ群等方式告知我。

本文地址:https://www.cnblogs.com/coredx/p/17746162.html


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK