001/*
002 *  Licensed to the Apache Software Foundation (ASF) under one or more
003 *  contributor license agreements.  See the NOTICE file distributed with
004 *  this work for additional information regarding copyright ownership.
005 *  The ASF licenses this file to You under the Apache License, Version 2.0
006 *  (the "License"); you may not use this file except in compliance with
007 *  the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 *  Unless required by applicable law or agreed to in writing, software
012 *  distributed under the License is distributed on an "AS IS" BASIS,
013 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 *  See the License for the specific language governing permissions and
015 *  limitations under the License.
016 *
017 */
018package org.apache.commons.compress.archivers.zip;
019
020import org.apache.commons.compress.parallel.ScatterGatherBackingStore;
021
022import java.io.Closeable;
023import java.io.DataOutput;
024import java.io.IOException;
025import java.io.InputStream;
026import java.io.OutputStream;
027import java.nio.ByteBuffer;
028import java.nio.channels.SeekableByteChannel;
029import java.util.zip.CRC32;
030import java.util.zip.Deflater;
031import java.util.zip.ZipEntry;
032
033/**
034 * Encapsulates a {@link Deflater} and crc calculator, handling multiple types of output streams.
035 * Currently {@link java.util.zip.ZipEntry#DEFLATED} and {@link java.util.zip.ZipEntry#STORED} are the only
036 * supported compression methods.
037 *
038 * @since 1.10
039 */
040public abstract class StreamCompressor implements Closeable {
041
042    /*
043     * Apparently Deflater.setInput gets slowed down a lot on Sun JVMs
044     * when it gets handed a really big buffer.  See
045     * https://issues.apache.org/bugzilla/show_bug.cgi?id=45396
046     *
047     * Using a buffer size of 8 kB proved to be a good compromise
048     */
049    private static final int DEFLATER_BLOCK_SIZE = 8192;
050
051    private final Deflater def;
052
053    private final CRC32 crc = new CRC32();
054
055    private long writtenToOutputStreamForLastEntry = 0;
056    private long sourcePayloadLength = 0;
057    private long totalWrittenToOutputStream = 0;
058
059    private static final int BUFFER_SIZE = 4096;
060    private final byte[] outputBuffer = new byte[BUFFER_SIZE];
061    private final byte[] readerBuf = new byte[BUFFER_SIZE];
062
063    StreamCompressor(final Deflater deflater) {
064        this.def = deflater;
065    }
066
067    /**
068     * Create a stream compressor with the given compression level.
069     *
070     * @param os       The stream to receive output
071     * @param deflater The deflater to use
072     * @return A stream compressor
073     */
074    static StreamCompressor create(final OutputStream os, final Deflater deflater) {
075        return new OutputStreamCompressor(deflater, os);
076    }
077
078    /**
079     * Create a stream compressor with the default compression level.
080     *
081     * @param os The stream to receive output
082     * @return A stream compressor
083     */
084    static StreamCompressor create(final OutputStream os) {
085        return create(os, new Deflater(Deflater.DEFAULT_COMPRESSION, true));
086    }
087
088    /**
089     * Create a stream compressor with the given compression level.
090     *
091     * @param os       The DataOutput to receive output
092     * @param deflater The deflater to use for the compressor
093     * @return A stream compressor
094     */
095    static StreamCompressor create(final DataOutput os, final Deflater deflater) {
096        return new DataOutputCompressor(deflater, os);
097    }
098
099    /**
100     * Create a stream compressor with the given compression level.
101     *
102     * @param os       The SeekableByteChannel to receive output
103     * @param deflater The deflater to use for the compressor
104     * @return A stream compressor
105     * @since 1.13
106     */
107    static StreamCompressor create(final SeekableByteChannel os, final Deflater deflater) {
108        return new SeekableByteChannelCompressor(deflater, os);
109    }
110
111    /**
112     * Create a stream compressor with the given compression level.
113     *
114     * @param compressionLevel The {@link Deflater}  compression level
115     * @param bs               The ScatterGatherBackingStore to receive output
116     * @return A stream compressor
117     */
118    public static StreamCompressor create(final int compressionLevel, final ScatterGatherBackingStore bs) {
119        final Deflater deflater = new Deflater(compressionLevel, true);
120        return new ScatterGatherBackingStoreCompressor(deflater, bs);
121    }
122
123    /**
124     * Create a stream compressor with the default compression level.
125     *
126     * @param bs The ScatterGatherBackingStore to receive output
127     * @return A stream compressor
128     */
129    public static StreamCompressor create(final ScatterGatherBackingStore bs) {
130        return create(Deflater.DEFAULT_COMPRESSION, bs);
131    }
132
133    /**
134     * The crc32 of the last deflated file
135     *
136     * @return the crc32
137     */
138
139    public long getCrc32() {
140        return crc.getValue();
141    }
142
143    /**
144     * Return the number of bytes read from the source stream
145     *
146     * @return The number of bytes read, never negative
147     */
148    public long getBytesRead() {
149        return sourcePayloadLength;
150    }
151
152    /**
153     * The number of bytes written to the output for the last entry
154     *
155     * @return The number of bytes, never negative
156     */
157    public long getBytesWrittenForLastEntry() {
158        return writtenToOutputStreamForLastEntry;
159    }
160
161    /**
162     * The total number of bytes written to the output for all files
163     *
164     * @return The number of bytes, never negative
165     */
166    public long getTotalBytesWritten() {
167        return totalWrittenToOutputStream;
168    }
169
170
171    /**
172     * Deflate the given source using the supplied compression method
173     *
174     * @param source The source to compress
175     * @param method The #ZipArchiveEntry compression method
176     * @throws IOException When failures happen
177     */
178
179    public void deflate(final InputStream source, final int method) throws IOException {
180        reset();
181        int length;
182
183        while ((length = source.read(readerBuf, 0, readerBuf.length)) >= 0) {
184            write(readerBuf, 0, length, method);
185        }
186        if (method == ZipEntry.DEFLATED) {
187            flushDeflater();
188        }
189    }
190
191    /**
192     * Writes bytes to ZIP entry.
193     *
194     * @param b      the byte array to write
195     * @param offset the start position to write from
196     * @param length the number of bytes to write
197     * @param method the comrpession method to use
198     * @return the number of bytes written to the stream this time
199     * @throws IOException on error
200     */
201    long write(final byte[] b, final int offset, final int length, final int method) throws IOException {
202        final long current = writtenToOutputStreamForLastEntry;
203        crc.update(b, offset, length);
204        if (method == ZipEntry.DEFLATED) {
205            writeDeflated(b, offset, length);
206        } else {
207            writeCounted(b, offset, length);
208        }
209        sourcePayloadLength += length;
210        return writtenToOutputStreamForLastEntry - current;
211    }
212
213
214    void reset() {
215        crc.reset();
216        def.reset();
217        sourcePayloadLength = 0;
218        writtenToOutputStreamForLastEntry = 0;
219    }
220
221    @Override
222    public void close() throws IOException {
223        def.end();
224    }
225
226    void flushDeflater() throws IOException {
227        def.finish();
228        while (!def.finished()) {
229            deflate();
230        }
231    }
232
233    private void writeDeflated(final byte[] b, final int offset, final int length)
234            throws IOException {
235        if (length > 0 && !def.finished()) {
236            if (length <= DEFLATER_BLOCK_SIZE) {
237                def.setInput(b, offset, length);
238                deflateUntilInputIsNeeded();
239            } else {
240                final int fullblocks = length / DEFLATER_BLOCK_SIZE;
241                for (int i = 0; i < fullblocks; i++) {
242                    def.setInput(b, offset + i * DEFLATER_BLOCK_SIZE,
243                            DEFLATER_BLOCK_SIZE);
244                    deflateUntilInputIsNeeded();
245                }
246                final int done = fullblocks * DEFLATER_BLOCK_SIZE;
247                if (done < length) {
248                    def.setInput(b, offset + done, length - done);
249                    deflateUntilInputIsNeeded();
250                }
251            }
252        }
253    }
254
255    private void deflateUntilInputIsNeeded() throws IOException {
256        while (!def.needsInput()) {
257            deflate();
258        }
259    }
260
261    void deflate() throws IOException {
262        final int len = def.deflate(outputBuffer, 0, outputBuffer.length);
263        if (len > 0) {
264            writeCounted(outputBuffer, 0, len);
265        }
266    }
267
268    public void writeCounted(final byte[] data) throws IOException {
269        writeCounted(data, 0, data.length);
270    }
271
272    public void writeCounted(final byte[] data, final int offset, final int length) throws IOException {
273        writeOut(data, offset, length);
274        writtenToOutputStreamForLastEntry += length;
275        totalWrittenToOutputStream += length;
276    }
277
278    protected abstract void writeOut(byte[] data, int offset, int length) throws IOException;
279
280    private static final class ScatterGatherBackingStoreCompressor extends StreamCompressor {
281        private final ScatterGatherBackingStore bs;
282
283        public ScatterGatherBackingStoreCompressor(final Deflater deflater, final ScatterGatherBackingStore bs) {
284            super(deflater);
285            this.bs = bs;
286        }
287
288        @Override
289        protected final void writeOut(final byte[] data, final int offset, final int length)
290                throws IOException {
291            bs.writeOut(data, offset, length);
292        }
293    }
294
295    private static final class OutputStreamCompressor extends StreamCompressor {
296        private final OutputStream os;
297
298        public OutputStreamCompressor(final Deflater deflater, final OutputStream os) {
299            super(deflater);
300            this.os = os;
301        }
302
303        @Override
304        protected final void writeOut(final byte[] data, final int offset, final int length)
305                throws IOException {
306            os.write(data, offset, length);
307        }
308    }
309
310    private static final class DataOutputCompressor extends StreamCompressor {
311        private final DataOutput raf;
312
313        public DataOutputCompressor(final Deflater deflater, final DataOutput raf) {
314            super(deflater);
315            this.raf = raf;
316        }
317
318        @Override
319        protected final void writeOut(final byte[] data, final int offset, final int length)
320                throws IOException {
321            raf.write(data, offset, length);
322        }
323    }
324
325    private static final class SeekableByteChannelCompressor extends StreamCompressor {
326        private final SeekableByteChannel channel;
327
328        public SeekableByteChannelCompressor(final Deflater deflater,
329                                             final SeekableByteChannel channel) {
330            super(deflater);
331            this.channel = channel;
332        }
333
334        @Override
335        protected final void writeOut(final byte[] data, final int offset, final int length)
336                throws IOException {
337            channel.write(ByteBuffer.wrap(data, offset, length));
338        }
339    }
340}