Stream.CopyToAsync with progress reporting - progress is reported even after copying finish

Misiu picture Misiu · Sep 28, 2016 · Viewed 8.1k times · Source

I've build a simple console applications that download files from the internet.
Because I had problems with WebClient I decided to write my app using HttpClient.

Basically I'm doing request to read headers, then using ReadAsStreamAsync I'm getting stream which I'm copying to local file using CopyToAsync.

I've found extension method for stream that supports IProgress:

public static class StreamExtensions
{
    public static async Task CopyToAsync(this Stream source, Stream destination, IProgress<long> progress, CancellationToken cancellationToken = default(CancellationToken), int bufferSize = 0x1000)
    {
        var buffer = new byte[bufferSize];
        int bytesRead;
        long totalRead = 0;
        while ((bytesRead = await source.ReadAsync(buffer, 0, buffer.Length, cancellationToken)) > 0)
        {
            await destination.WriteAsync(buffer, 0, bytesRead, cancellationToken);
            cancellationToken.ThrowIfCancellationRequested();
            totalRead += bytesRead;
            //Thread.Sleep(10);
            progress.Report(totalRead);
        }
    }
}

My application works, but I get incorrect progress information.
For example when downloading 2 files I see this in output window:

file1.tmp 60.95%
file2.tmp 98.09%
file1.tmp 60.98%
file2.tmp 98.21%
file2.tmp 98.17%
file2.tmp 98.25%
file1.tmp 61.02%
file2.tmp 98.41%
file2.tmp downloaded
file2.tmp 98.29%
file2.tmp 98.37%
file1.tmp 61.06%
file2.tmp 89.27%
file2.tmp 89.31%
file2.tmp 98.33%
file2.tmp 98.45%
file2.tmp 98.48%
file1.tmp 61.10%
file1.tmp 61.14%
file2.tmp 98.52%
file1.tmp 61.22%
file2.tmp 98.60%
file2.tmp 98.56%
file1.tmp 61.30%
file2.tmp 98.88%
file2.tmp 90.44%
file1.tmp 61.53%
file2.tmp 98.72%
file1.tmp 61.41%
file1.tmp 61.73%
file2.tmp 98.80%
file1.tmp 61.26%
file1.tmp 61.49%
file1.tmp 61.57%
file1.tmp 61.69%
...
file1.tmp 99.31%
file1.tmp 98.84%
file1.tmp 98.80%
file1.tmp 99.04%
file1.tmp 99.43%
file1.tmp 99.12%
file1.tmp 99.00%
file1.tmp downloaded
file1.tmp 100.00%
file1.tmp 98.73%
file1.tmp 98.88%
file1.tmp 99.47%
file1.tmp 99.98%
file1.tmp 99.90%
file1.tmp 98.96%
file1.tmp 99.78%
file1.tmp 99.99%
file1.tmp 99.74%
file1.tmp 99.59%
file1.tmp 99.94%
file1.tmp 98.49%
file1.tmp 98.53%
ALL FILES DOWNLOADED
file1.tmp 99.55%
file1.tmp 98.41%
file1.tmp 99.62%
file1.tmp 98.34%
file1.tmp 99.66%
file1.tmp 98.69%
file1.tmp 98.37%

As You can see I got info that file2 is downloaded, but I'm still getting progress report from CopyToAsync , same with file1.

because of that I sometimes get this weird console output:

enter image description here

Ideally I'd like to be sure than when I call:

await streamToReadFrom.CopyToAsync(streamToWriteTo, progress, source.Token,0x2000);
Debug.WriteLine(filename+" downloaded");

after I get that debug information no progress is reported (file is downloaded). I thought that await will solve my problem, but is doesn't.

How can I fix this? As a temporary solution I'm adding Thread.Sleep to CopyToAsync just before I report progress.

Below is my current code:

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Linq;
using System.Net.Http;
using System.Threading;
using System.Threading.Tasks;

namespace AsyncDownloadTest
{
    class Program
    {
        private const string LocalPath = @"D:\TEMP";

        static void Main()
        {
            try
            {
                var filesToDownlad = new List<Tuple<string, string>>
                {
                    new Tuple<string, string>("file1.tmp", "http://ipv4.download.thinkbroadband.com/10MB.zip"),
                    new Tuple<string, string>("file2.tmp", "http://ipv4.download.thinkbroadband.com/10MB.zip")
                };
                _consolePosition = -1;
                Console.CursorVisible = false;

                Parallel.ForEach(filesToDownlad, new ParallelOptions { MaxDegreeOfParallelism = 4 }, doc =>
                {
                    DownloadFile(doc.Item2,doc.Item1).Wait();
                });
                Debug.WriteLine("ALL FILES DOWNLOADED");
                Console.CursorVisible = true;    
            }
            catch (Exception e)
            {
                Console.WriteLine(e);
                Console.ReadLine();
            }
        }

        private static readonly object ConsoleLock = new object();
        private static int _consolePosition;

        static readonly CancellationTokenSource source = new CancellationTokenSource();

        private static async Task DownloadFile(string url, string filename)
        {
            int currenctLineNumber = 0;
            int currectProgress = 0;

            try
            {
                lock (ConsoleLock)
                {
                    _consolePosition++;
                    currenctLineNumber = _consolePosition;
                }

                long fileSize = -1;

                IProgress<long> progress = new Progress<long>(value =>
                {
                    decimal tmp = (decimal)(value * 100) / fileSize;

                    if (tmp != currectProgress && tmp > currectProgress)
                    {
                        lock (ConsoleLock)
                        {
                            currectProgress = (int)tmp;
                            Console.CursorTop = currenctLineNumber;
                            Console.CursorLeft = 0;
                            Console.Write("{0,10} - {2,11} - {1,6:N2}%", filename, tmp, "DOWNLOADING");
                        }
                        Debug.WriteLine("{1} {0:N2}%", tmp, filename);
                    }
                });

                using (HttpClient client = new HttpClient())
                {
                    using (HttpResponseMessage response = await client.GetAsync(url, HttpCompletionOption.ResponseHeadersRead, source.Token))
                    {
                        response.EnsureSuccessStatusCode();
                        if (response.Content.Headers.ContentLength.HasValue) fileSize = response.Content.Headers.ContentLength.Value;

                        if (response.Content.Headers.ContentDisposition != null)
                        {
                            var tmp = response.Content.Headers.ContentDisposition.FileName.Replace("\"", "");
                            Debug.WriteLine("Real name: {0}",tmp);
                        }

                        using (Stream streamToReadFrom = await response.Content.ReadAsStreamAsync())
                        {
                            using (Stream streamToWriteTo = File.Open(Path.Combine(LocalPath, filename), FileMode.Create, FileAccess.Write))
                            {
                                await streamToReadFrom.CopyToAsync(streamToWriteTo, progress, source.Token,0x2000);

                                Debug.WriteLine(filename+" downloaded");

                                lock (ConsoleLock)
                                {
                                    Console.CursorTop = currenctLineNumber;
                                    Console.CursorLeft = 0;
                                    var oldColor = Console.ForegroundColor;
                                    Console.ForegroundColor = ConsoleColor.Green;
                                    Console.Write("{0,10} - {2,11} - {1,6:N2}%", filename, 100, "SUCCESS");
                                    Console.ForegroundColor = oldColor;
                                }
                            }
                        }
                    }
                }
            }
            catch (Exception e)
            {
                Debug.WriteLine(e.Message);
                lock (ConsoleLock)
                {
                    Console.CursorTop = currenctLineNumber;
                    Console.CursorLeft = 0;
                    var oldColor = Console.ForegroundColor;
                    Console.ForegroundColor = ConsoleColor.Red;
                    Console.Write("{0,10} - {2,11} - {1,6:N2}%", filename, currectProgress, "ERROR");
                    Console.ForegroundColor = oldColor;
                }
            }
        }
    }

    public static class StreamExtensions
    {
        public static async Task CopyToAsync(this Stream source, Stream destination, IProgress<long> progress, CancellationToken cancellationToken = default(CancellationToken), int bufferSize = 0x1000)
        {
            var buffer = new byte[bufferSize];
            int bytesRead;
            long totalRead = 0;
            while ((bytesRead = await source.ReadAsync(buffer, 0, buffer.Length, cancellationToken)) > 0)
            {
                await destination.WriteAsync(buffer, 0, bytesRead, cancellationToken);
                cancellationToken.ThrowIfCancellationRequested();
                totalRead += bytesRead;
                Thread.Sleep(10);
                progress.Report(totalRead);
            }
        }
    }
}

Answer

Stephen Cleary picture Stephen Cleary · Sep 28, 2016

Your problem is actually here:

new Progress<long>

The Progress<T> class always invokes its callbacks in a SynchronizationContext - which in this case is the thread pool SynchronizationContext. This means that when the progress reporting code calls Report, it's just queueing the callback to the thread pool. So, it's possible to see them out of order (or still coming in a bit after the download has actually finished).

To fix this, you can create your own custom implementation of IProgress<T>:

//C#6.0
public sealed class SynchronousProgress<T> : IProgress<T>
{
  private readonly Action<T> _callback;
  public SynchronousProgress(Action<T> callback) { _callback = callback; }
  void IProgress<T>.Report(T data) => _callback(data);
}
//older version
public sealed class SynchronousProgress<T> : IProgress<T>
{
    private readonly Action<T> _callback;

    public SynchronousProgress(Action<T> callback)
    {
        _callback = callback;
    }

    void IProgress<T>.Report(T data)
    {
        _callback(data);
    }
}

Then replace the line

IProgress<long> progress = new Progress<long>(value =>

with

IProgress<long> progress = new SynchronousProgress<long>(value =>