I created this to test out a parallel extract:
public static async Task ExtractToDirectoryAsync(this FileInfo file, DirectoryInfo folder)
{
ActionBlock<ZipArchiveEntry> block = new ActionBlock<ZipArchiveEntry>((entry) =>
{
var path = Path.Combine(folder.FullName, entry.FullName);
Directory.CreateDirectory(Path.GetDirectoryName(path));
entry.ExtractToFile(path);
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 2 });
using (var archive = ZipFile.OpenRead(file.FullName))
{
foreach (var entry in archive.Entries.Where(e => e.Name != string.Empty))
{
block.Post(entry);
}
block.Complete();
await block.Completion;
}
}
and the following unit test for testing:
[TestMethod]
public async Task ExtractTestAsync()
{
if (Resources.LocalExtractFolder.Exists)
Resources.LocalExtractFolder.Delete(true);
// Resources.LocalExtractFolder.Create();
await Resources.WebsiteZip.ExtractToDirectoryAsync(Resources.LocalExtractFolder);
}
With MaxDegreeOfParallelism = 1, things work but with 2 it do not.
Test Name: ExtractTestAsync
Test FullName: Composite.Azure.Tests.ZipFileTests.ExtractTestAsync
Test Source: c:\Development\C1\local\CompositeC1\Composite.Azure.Tests\ZipFileTests.cs : line 21
Test Outcome: Failed
Test Duration: 0:00:02.4138753
Result Message:
Test method Composite.Azure.Tests.ZipFileTests.ExtractTestAsync threw exception:
System.IO.InvalidDataException: Unknown block type. Stream might be corrupted.
Result StackTrace:
at System.IO.Compression.Inflater.Decode()
at System.IO.Compression.Inflater.Inflate(Byte[] bytes, Int32 offset, Int32 length)
at System.IO.Compression.DeflateStream.Read(Byte[] array, Int32 offset, Int32 count)
at System.IO.Stream.InternalCopyTo(Stream destination, Int32 bufferSize)
at System.IO.Stream.CopyTo(Stream destination)
at System.IO.Compression.ZipFileExtensions.ExtractToFile(ZipArchiveEntry source, String destinationFileName, Boolean overwrite)
at System.IO.Compression.ZipFileExtensions.ExtractToFile(ZipArchiveEntry source, String destinationFileName)
at Composite.Azure.Storage.Compression.ZipArchiveExtensions.<>c__DisplayClass6.<ExtractToDirectoryAsync>b__3(ZipArchiveEntry entry) in c:\Development\C1\local\CompositeC1\Composite.Azure.Storage\Compression\ZipArchiveExtensions.cs:line 37
at System.Threading.Tasks.Dataflow.ActionBlock`1.ProcessMessage(Action`1 action, KeyValuePair`2 messageWithId)
at System.Threading.Tasks.Dataflow.ActionBlock`1.<>c__DisplayClass5.<.ctor>b__0(KeyValuePair`2 messageWithId)
at System.Threading.Tasks.Dataflow.Internal.TargetCore`1.ProcessMessagesLoopCore()
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at System.Runtime.CompilerServices.TaskAwaiter.GetResult()
at Composite.Azure.Storage.Compression.ZipArchiveExtensions.<ExtractToDirectoryAsync>d__8.MoveNext() in c:\Development\C1\local\CompositeC1\Composite.Azure.Storage\Compression\ZipArchiveExtensions.cs:line 48
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at System.Runtime.CompilerServices.TaskAwaiter.GetResult()
at Composite.Azure.Tests.ZipFileTests.<ExtractTestAsync>d__2.MoveNext() in c:\Development\C1\local\CompositeC1\Composite.Azure.Tests\ZipFileTests.cs:line 25
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at System.Runtime.CompilerServices.TaskAwaiter.GetResult()
Here is a my own go at doing it parallel, it dont work either :) Remember to handle exceptions in the continueWith.
public static void ExtractToDirectorySemaphore(this FileInfo file, DirectoryInfo folder)
{
int MaxDegreeOfParallelism = 2;
using (var archive = ZipFile.OpenRead(file.FullName))
{
var semaphore = new Semaphore(MaxDegreeOfParallelism, MaxDegreeOfParallelism);
foreach (var entry in archive.Entries.Where(e => e.Name != string.Empty))
{
semaphore.WaitOne();
var task = Task.Run(() =>
{
var path = Path.Combine(folder.FullName, entry.FullName);
Directory.CreateDirectory(Path.GetDirectoryName(path));
entry.ExtractToFile(path);
});
task.ContinueWith(handle =>
{
try
{
//do any cleanup/post processing
}
finally
{
// Release the semaphore so the next thing can be processed
semaphore.Release();
}
});
}
while(MaxDegreeOfParallelism-->0)
semaphore.WaitOne(); //Wait here until the last task completes.
}
}
And here is the async version:
public static Task ExtractToDirectorySemaphoreAsync(this FileInfo file, DirectoryInfo folder)
{
return Task.Factory.StartNew(() =>
{
int MaxDegreeOfParallelism = 50;
using (var archive = ZipFile.OpenRead(file.FullName))
{
var semaphore = new Semaphore(MaxDegreeOfParallelism, MaxDegreeOfParallelism);
foreach (var entry in archive.Entries.Where(e => e.Name != string.Empty))
{
semaphore.WaitOne();
var task = Task.Run(() =>
{
var path = Path.Combine(folder.FullName, entry.FullName);
Directory.CreateDirectory(Path.GetDirectoryName(path));
entry.ExtractToFile(path);
});
task.ContinueWith(handle =>
{
try
{
//do any cleanup/post processing
}
finally
{
// Release the semaphore so the next thing can be processed
semaphore.Release();
}
},TaskContinuationOptions.AttachedToParent); // the outher task will wait for all.
}
}
});
}
The following exceptions is thrown in the handle.Exception.
{"Block length does not match with its complement."}
[0] = {"A local file header is corrupt."}
Have to find out if ZipFile is thread safe or not.
Disclamer: Its only a proof of concept.
Replacing ZipFile.OpenRead with ParallelZipFile.OpenRead in the samples in the code all 4 unittests passes.
public class ParallelZipFile
{
public static ParallelZipArchive OpenRead(string path)
{
return new ParallelZipArchive(ZipFile.OpenRead(path),path);
}
}
public class ParallelZipArchive : IDisposable
{
internal ZipArchive _archive;
internal string _path;
internal ConcurrentQueue<ZipArchive> FreeReaders = new ConcurrentQueue<ZipArchive>();
public ParallelZipArchive(ZipArchive zip,string path)
{
_path = path;
_archive = zip;
FreeReaders.Enqueue(zip);
}
public ReadOnlyCollection<ParallelZipArchiveEntry> Entries
{
get
{
var list = new List<ParallelZipArchiveEntry>(_archive.Entries.Count);
int i = 0;
foreach (var entry in _archive.Entries)
list.Add(new ParallelZipArchiveEntry(i++, entry, this));
return new ReadOnlyCollection<ParallelZipArchiveEntry>(list);
}
}
public void Dispose()
{
foreach (var archive in FreeReaders)
archive.Dispose();
}
}
public class ParallelZipArchiveEntry
{
private ParallelZipArchive _parent;
private int _entry;
public string Name { get; set; }
public string FullName { get; set; }
public ParallelZipArchiveEntry(int entryNr, ZipArchiveEntry entry, ParallelZipArchive parent)
{
_entry = entryNr;
_parent = parent;
Name = entry.Name;
FullName = entry.FullName;
}
public void ExtractToFile(string path)
{
ZipArchive value;
Trace.TraceInformation(string.Format("Number of readers: {0}", _parent.FreeReaders.Count));
if (!_parent.FreeReaders.TryDequeue(out value))
value = ZipFile.OpenRead(_parent._path);
value.Entries.Skip(_entry).First().ExtractToFile(path);
_parent.FreeReaders.Enqueue(value);
}
}
[TestClass]
public class ZipFileTests
{
[ClassInitialize()]
public static void PreInitialize(TestContext context)
{
if (Resources.LocalExtractFolderTruth.Exists)
Resources.LocalExtractFolderTruth.Delete(true);
ZipFile.ExtractToDirectory(Resources.WebsiteZip.FullName, Resources.LocalExtractFolderTruth.FullName);
}
[TestInitialize()]
public void InitializeTests()
{
if (Resources.LocalExtractFolder.Exists)
Resources.LocalExtractFolder.Delete(true);
}
[TestMethod]
public void ExtractTest()
{
Resources.WebsiteZip.ExtractToDirectory(Resources.LocalExtractFolder);
Assert.IsTrue(Helpers.DirectoryTools.CompareDirectories(
Resources.LocalExtractFolderTruth, Resources.LocalExtractFolder));
}
[TestMethod]
public async Task ExtractAsyncTest()
{
await Resources.WebsiteZip.ExtractToDirectoryAsync(Resources.LocalExtractFolder);
Assert.IsTrue(Helpers.DirectoryTools.CompareDirectories(
Resources.LocalExtractFolderTruth, Resources.LocalExtractFolder));
}
[TestMethod]
public void ExtractSemaphoreTest()
{
Resources.WebsiteZip.ExtractToDirectorySemaphore(Resources.LocalExtractFolder);
Assert.IsTrue(Helpers.DirectoryTools.CompareDirectories(
Resources.LocalExtractFolderTruth, Resources.LocalExtractFolder));
}
[TestMethod]
public async Task ExtractSemaphoreAsyncTest()
{
await Resources.WebsiteZip.ExtractToDirectorySemaphoreAsync(Resources.LocalExtractFolder);
Assert.IsTrue(Helpers.DirectoryTools.CompareDirectories(
Resources.LocalExtractFolderTruth, Resources.LocalExtractFolder));
}
}