001/*
002 *  Licensed to the Apache Software Foundation (ASF) under one or more
003 *  contributor license agreements.  See the NOTICE file distributed with
004 *  this work for additional information regarding copyright ownership.
005 *  The ASF licenses this file to You under the Apache License, Version 2.0
006 *  (the "License"); you may not use this file except in compliance with
007 *  the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 *  Unless required by applicable law or agreed to in writing, software
012 *  distributed under the License is distributed on an "AS IS" BASIS,
013 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 *  See the License for the specific language governing permissions and
015 *  limitations under the License.
016 *
017 */
018package org.apache.commons.compress.archivers.zip;
019
020
021import org.apache.commons.compress.parallel.FileBasedScatterGatherBackingStore;
022import org.apache.commons.compress.parallel.ScatterGatherBackingStore;
023import org.apache.commons.compress.utils.BoundedInputStream;
024
025import java.io.Closeable;
026import java.io.File;
027import java.io.FileNotFoundException;
028import java.io.IOException;
029import java.io.InputStream;
030import java.util.Queue;
031import java.util.concurrent.ConcurrentLinkedQueue;
032import java.util.zip.Deflater;
033
034/**
035 * A zip output stream that is optimized for multi-threaded scatter/gather construction of zip files.
036 * <p>
037 * The internal data format of the entries used by this class are entirely private to this class
038 * and are not part of any public api whatsoever.
039 * </p>
040 * <p>It is possible to extend this class to support different kinds of backing storage, the default
041 * implementation only supports file-based backing.
042 * </p>
043 * Thread safety: This class supports multiple threads. But the "writeTo" method must be called
044 * by the thread that originally created the {@link ZipArchiveEntry}.
045 *
046 * @since 1.10
047 */
048public class ScatterZipOutputStream implements Closeable {
049    private final Queue<CompressedEntry> items = new ConcurrentLinkedQueue<>();
050    private final ScatterGatherBackingStore backingStore;
051    private final StreamCompressor streamCompressor;
052
053    private static class CompressedEntry {
054        final ZipArchiveEntryRequest zipArchiveEntryRequest;
055        final long crc;
056        final long compressedSize;
057        final long size;
058
059        public CompressedEntry(final ZipArchiveEntryRequest zipArchiveEntryRequest, final long crc, final long compressedSize, final long size) {
060            this.zipArchiveEntryRequest = zipArchiveEntryRequest;
061            this.crc = crc;
062            this.compressedSize = compressedSize;
063            this.size = size;
064        }
065
066        /**
067         * Update the original {@link ZipArchiveEntry} with sizes/crc
068         * Do not use this methods from threads that did not create the instance itself !
069         * @return the zipArchiveEntry that is basis for this request
070         */
071
072        public ZipArchiveEntry transferToArchiveEntry(){
073            final ZipArchiveEntry entry = zipArchiveEntryRequest.getZipArchiveEntry();
074            entry.setCompressedSize(compressedSize);
075            entry.setSize(size);
076            entry.setCrc(crc);
077            entry.setMethod(zipArchiveEntryRequest.getMethod());
078            return entry;
079        }
080    }
081
082    public ScatterZipOutputStream(final ScatterGatherBackingStore backingStore,
083                                  final StreamCompressor streamCompressor) {
084        this.backingStore = backingStore;
085        this.streamCompressor = streamCompressor;
086    }
087
088    /**
089     * Add an archive entry to this scatter stream.
090     *
091     * @param zipArchiveEntryRequest The entry to write.
092     * @throws IOException    If writing fails
093     */
094    public void addArchiveEntry(final ZipArchiveEntryRequest zipArchiveEntryRequest) throws IOException {
095        try (final InputStream payloadStream = zipArchiveEntryRequest.getPayloadStream()) {
096            streamCompressor.deflate(payloadStream, zipArchiveEntryRequest.getMethod());
097        }
098        items.add(new CompressedEntry(zipArchiveEntryRequest, streamCompressor.getCrc32(),
099                                      streamCompressor.getBytesWrittenForLastEntry(), streamCompressor.getBytesRead()));
100    }
101
102    /**
103     * Write the contents of this scatter stream to a target archive.
104     *
105     * @param target The archive to receive the contents of this {@link ScatterZipOutputStream}.
106     * @throws IOException If writing fails
107     */
108    public void writeTo(final ZipArchiveOutputStream target) throws IOException {
109        backingStore.closeForWriting();
110        try (final InputStream data = backingStore.getInputStream()) {
111            for (final CompressedEntry compressedEntry : items) {
112                try (final BoundedInputStream rawStream = new BoundedInputStream(data,
113                        compressedEntry.compressedSize)) {
114                    target.addRawArchiveEntry(compressedEntry.transferToArchiveEntry(), rawStream);
115                }
116            }
117        }
118    }
119
120
121    /**
122     * Closes this stream, freeing all resources involved in the creation of this stream.
123     * @throws IOException If closing fails
124     */
125    @Override
126    public void close() throws IOException {
127        try {
128            backingStore.close();
129        } finally {
130            streamCompressor.close();
131        }
132    }
133
134    /**
135     * Create a {@link ScatterZipOutputStream} with default compression level that is backed by a file
136     *
137     * @param file The file to offload compressed data into.
138     * @return A ScatterZipOutputStream that is ready for use.
139     * @throws FileNotFoundException if the file cannot be found
140     */
141    public static ScatterZipOutputStream fileBased(final File file) throws FileNotFoundException {
142        return fileBased(file, Deflater.DEFAULT_COMPRESSION);
143    }
144
145    /**
146     * Create a {@link ScatterZipOutputStream} that is backed by a file
147     *
148     * @param file             The file to offload compressed data into.
149     * @param compressionLevel The compression level to use, @see #Deflater
150     * @return A  ScatterZipOutputStream that is ready for use.
151     * @throws FileNotFoundException if the file cannot be found
152     */
153    public static ScatterZipOutputStream fileBased(final File file, final int compressionLevel) throws FileNotFoundException {
154        final ScatterGatherBackingStore bs = new FileBasedScatterGatherBackingStore(file);
155        // lifecycle is bound to the ScatterZipOutputStream returned
156        final StreamCompressor sc = StreamCompressor.create(compressionLevel, bs); //NOSONAR
157        return new ScatterZipOutputStream(bs, sc);
158    }
159}