1+ using System ;
2+ using System . Buffers ;
3+ using System . Reactive . Linq ;
4+ using ZLibDotNet ;
5+
6+ namespace BlockCompressor ;
7+
8+ public static class Compressed
9+ {
10+ /// <summary>
11+ /// Default block size (64KB)
12+ /// </summary>
13+ public const int DefaultBlockSize = 64 * 1024 ; // 64KB
14+
15+ /// <summary>
16+ /// Compresses an observable stream of byte arrays into blocks using zlib's deflate algorithm
17+ /// configured in the style of makemsix/makeappx
18+ /// </summary>
19+ public static IObservable < DeflateBlock > Blocks (
20+ IObservable < byte [ ] > input ,
21+ int compressionLevel = ZLib . Z_BEST_COMPRESSION ,
22+ int uncompressedBlockSize = DefaultBlockSize )
23+ {
24+ return Observable . Create < DeflateBlock > ( observer =>
25+ {
26+ // Use ArrayPool to reduce GC pressure
27+ byte [ ] rentedBuffer = ArrayPool < byte > . Shared . Rent ( uncompressedBlockSize * 2 ) ;
28+ int bytesInBuffer = 0 ;
29+
30+ var wrapper = new ZStreamWrapper ( ) ;
31+
32+ var subscription = input . Subscribe (
33+ onNext : incomingBytes =>
34+ {
35+ try
36+ {
37+ // Check if we need a larger buffer
38+ if ( bytesInBuffer + incomingBytes . Length > rentedBuffer . Length )
39+ {
40+ // Get a new buffer of appropriate size
41+ int newSize = Math . Max ( rentedBuffer . Length * 2 , bytesInBuffer + incomingBytes . Length ) ;
42+ byte [ ] newBuffer = ArrayPool < byte > . Shared . Rent ( newSize ) ;
43+
44+ // Copy existing data to new buffer
45+ Buffer . BlockCopy ( rentedBuffer , 0 , newBuffer , 0 , bytesInBuffer ) ;
46+
47+ // Return old buffer to pool
48+ ArrayPool < byte > . Shared . Return ( rentedBuffer ) ;
49+
50+ // Use new buffer
51+ rentedBuffer = newBuffer ;
52+ }
53+
54+ // Copy incoming bytes to our buffer
55+ Buffer . BlockCopy ( incomingBytes , 0 , rentedBuffer , bytesInBuffer , incomingBytes . Length ) ;
56+ bytesInBuffer += incomingBytes . Length ;
57+
58+ // Process complete blocks
59+ ProcessCompleteBlocks ( ) ;
60+ }
61+ catch ( Exception ex )
62+ {
63+ observer . OnError ( ex ) ;
64+ }
65+ } ,
66+ onError : ex =>
67+ {
68+ try
69+ {
70+ ArrayPool < byte > . Shared . Return ( rentedBuffer ) ;
71+ }
72+ finally
73+ {
74+ observer . OnError ( ex ) ;
75+ }
76+ } ,
77+ onCompleted : ( ) =>
78+ {
79+ try
80+ {
81+ // Process any remaining bytes if we have some
82+ if ( bytesInBuffer > 0 )
83+ {
84+ ProcessBlock ( rentedBuffer . AsSpan ( 0 , bytesInBuffer ) . ToArray ( ) ) ;
85+ }
86+
87+ // Process the final zlib block
88+ byte [ ] finalData = wrapper . DeflateFinish ( ) ;
89+
90+ if ( finalData . Length > 0 )
91+ {
92+ var finalBlock = new DeflateBlock
93+ {
94+ CompressedData = finalData ,
95+ OriginalData = [ ] ,
96+ } ;
97+
98+ observer . OnNext ( finalBlock ) ;
99+ }
100+
101+ observer . OnCompleted ( ) ;
102+ }
103+ catch ( Exception ex )
104+ {
105+ observer . OnError ( ex ) ;
106+ }
107+ finally
108+ {
109+ // Return the buffer to the pool
110+ ArrayPool < byte > . Shared . Return ( rentedBuffer ) ;
111+ }
112+ } ) ;
113+
114+ // Local function to process all complete blocks
115+ void ProcessCompleteBlocks ( )
116+ {
117+ // Process as many complete blocks as we can
118+ while ( bytesInBuffer >= uncompressedBlockSize )
119+ {
120+ // Create a block from the buffer
121+ byte [ ] blockData = new byte [ uncompressedBlockSize ] ;
122+ Buffer . BlockCopy ( rentedBuffer , 0 , blockData , 0 , uncompressedBlockSize ) ;
123+
124+ // Process it
125+ ProcessBlock ( blockData ) ;
126+
127+ // Move remaining data to the start of the buffer
128+ if ( bytesInBuffer > uncompressedBlockSize )
129+ {
130+ Buffer . BlockCopy (
131+ rentedBuffer ,
132+ uncompressedBlockSize ,
133+ rentedBuffer ,
134+ 0 ,
135+ bytesInBuffer - uncompressedBlockSize ) ;
136+ }
137+
138+ bytesInBuffer -= uncompressedBlockSize ;
139+ }
140+ }
141+
142+ // Local function to process a single block
143+ void ProcessBlock ( byte [ ] blockData )
144+ {
145+ try
146+ {
147+ // Compress the block using our wrapper
148+ byte [ ] compressedData = wrapper . Deflate (
149+ blockData ,
150+ compressionLevel ) ;
151+
152+ // Create and emit the compressed block
153+ var block = new DeflateBlock
154+ {
155+ CompressedData = compressedData ,
156+ OriginalData = blockData ,
157+ } ;
158+ observer . OnNext ( block ) ;
159+ }
160+ catch ( Exception ex )
161+ {
162+ observer . OnError ( ex ) ;
163+ }
164+ }
165+
166+ return subscription ;
167+ } ) ;
168+ }
169+ }
0 commit comments