001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.commons.compress.utils;
020
021import java.io.FileOutputStream;
022import java.io.IOException;
023import java.io.OutputStream;
024import java.nio.ByteBuffer;
025import java.nio.ByteOrder;
026import java.nio.channels.ClosedChannelException;
027import java.nio.channels.WritableByteChannel;
028import java.util.concurrent.atomic.AtomicBoolean;
029
030/**
031 * This class supports writing to an Outputstream or WritableByteChannel in fixed length blocks.
032 * <p>It can be be used to support output to devices such as tape drives that require output in this
033 * format. If the final block does not have enough content to fill an entire block, the output will
034 * be padded to a full block size.</p>
035 *
036 * <p>This class can be used to support TAR,PAX, and CPIO blocked output to character special devices.
037 * It is not recommended that this class be used unless writing to such devices, as the padding
038 * serves no useful purpose in such cases.</p>
039 *
040 * <p>This class should normally wrap a FileOutputStream or associated WritableByteChannel directly.
041 * If there is an intervening filter that modified the output, such as a CompressorOutputStream, or
042 * performs its own buffering, such as BufferedOutputStream,  output to the device may
043 * no longer be of the specified size.</p>
044 *
045 * <p>Any content written to this stream should be self-delimiting and should tolerate any padding
046 * added to fill the last block.</p>
047 *
048 * @since 1.15
049 */
050public class FixedLengthBlockOutputStream extends OutputStream implements WritableByteChannel {
051
052    private final WritableByteChannel out;
053    private final int blockSize;
054    private final ByteBuffer buffer;
055    private final AtomicBoolean closed = new AtomicBoolean(false);
056
057    /**
058     * Create a fixed length block output stream with given destination stream and block size
059     * @param os   The stream to wrap.
060     * @param blockSize The block size to use.
061     */
062    public FixedLengthBlockOutputStream(OutputStream os, int blockSize) {
063        if (os instanceof FileOutputStream) {
064            FileOutputStream fileOutputStream = (FileOutputStream) os;
065            out = fileOutputStream.getChannel();
066            buffer = ByteBuffer.allocateDirect(blockSize);
067        } else {
068            out = new BufferAtATimeOutputChannel(os);
069            buffer = ByteBuffer.allocate(blockSize);
070        }
071        this.blockSize = blockSize;
072    }
073     /**
074      * Create a fixed length block output stream with given destination writable byte channel and block size
075     * @param out   The writable byte channel to wrap.
076     * @param blockSize The block size to use.
077     */
078    public FixedLengthBlockOutputStream(WritableByteChannel out, int blockSize) {
079        this.out = out;
080        this.blockSize = blockSize;
081        this.buffer = ByteBuffer.allocateDirect(blockSize);
082    }
083
084    private void maybeFlush() throws IOException {
085        if (!buffer.hasRemaining()) {
086            writeBlock();
087        }
088    }
089
090    private void writeBlock() throws IOException {
091        buffer.flip();
092        int i = out.write(buffer);
093        boolean hasRemaining = buffer.hasRemaining();
094        if (i != blockSize || hasRemaining) {
095            String msg = String
096                .format("Failed to write %,d bytes atomically. Only wrote  %,d",
097                    blockSize, i);
098            throw new IOException(msg);
099        }
100        buffer.clear();
101    }
102
103    @Override
104    public void write(int b) throws IOException {
105        if (!isOpen()) {
106            throw new ClosedChannelException();
107        }
108        buffer.put((byte) b);
109        maybeFlush();
110    }
111
112    @Override
113    public void write(byte[] b, final int offset, final int length) throws IOException {
114        if (!isOpen()) {
115            throw new ClosedChannelException();
116        }
117        int off = offset;
118        int len = length;
119        while (len > 0) {
120            int n = Math.min(len, buffer.remaining());
121            buffer.put(b, off, n);
122            maybeFlush();
123            len -= n;
124            off += n;
125        }
126    }
127
128    @Override
129    public int write(ByteBuffer src) throws IOException {
130        if (!isOpen()) {
131            throw new ClosedChannelException();
132        }
133        int srcRemaining = src.remaining();
134
135        if (srcRemaining < buffer.remaining()) {
136            // if don't have enough bytes in src to fill up a block we must buffer
137            buffer.put(src);
138        } else {
139            int srcLeft = srcRemaining;
140            int savedLimit = src.limit();
141            // If we're not at the start of buffer, we have some bytes already  buffered
142            // fill up the reset of buffer and write the block.
143            if (buffer.position() != 0) {
144                int n = buffer.remaining();
145                src.limit(src.position() + n);
146                buffer.put(src);
147                writeBlock();
148                srcLeft -= n;
149            }
150            // whilst we have enough bytes in src for complete blocks,
151            // write them directly from src without copying them to buffer
152            while (srcLeft >= blockSize) {
153                src.limit(src.position() + blockSize);
154                out.write(src);
155                srcLeft -= blockSize;
156            }
157            // copy any remaining bytes into buffer
158            src.limit(savedLimit);
159            buffer.put(src);
160        }
161        return srcRemaining;
162    }
163
164    @Override
165    public boolean isOpen() {
166        if (!out.isOpen()) {
167            closed.set(true);
168        }
169        return !closed.get();
170    }
171
172    /**
173     * Potentially pads and then writes the current block to the underlying stream.
174     * @throws IOException if writing fails
175     */
176    public void flushBlock() throws IOException {
177        if (buffer.position() != 0) {
178            padBlock();
179            writeBlock();
180        }
181    }
182
183    @Override
184    public void close() throws IOException {
185        if (closed.compareAndSet(false, true)) {
186            try {
187                flushBlock();
188            } finally {
189                out.close();
190            }
191        }
192    }
193
194    private void padBlock() {
195        buffer.order(ByteOrder.nativeOrder());
196        int bytesToWrite = buffer.remaining();
197        if (bytesToWrite > 8) {
198            int align = buffer.position() & 7;
199            if (align != 0) {
200                int limit = 8 - align;
201                for (int i = 0; i < limit; i++) {
202                    buffer.put((byte) 0);
203                }
204                bytesToWrite -= limit;
205            }
206
207            while (bytesToWrite >= 8) {
208                buffer.putLong(0L);
209                bytesToWrite -= 8;
210            }
211        }
212        while (buffer.hasRemaining()) {
213            buffer.put((byte) 0);
214        }
215    }
216
217    /**
218     * Helper class to provide channel wrapper for arbitrary output stream that doesn't alter the
219     * size of writes.  We can't use Channels.newChannel, because for non FileOutputStreams, it
220     * breaks up writes into 8KB max chunks. Since the purpose of this class is to always write
221     * complete blocks, we need to write a simple class to take care of it.
222     */
223    private static class BufferAtATimeOutputChannel implements WritableByteChannel {
224
225        private final OutputStream out;
226        private final AtomicBoolean closed = new AtomicBoolean(false);
227
228        private BufferAtATimeOutputChannel(OutputStream out) {
229            this.out = out;
230        }
231
232        @Override
233        public int write(ByteBuffer buffer) throws IOException {
234            if (!isOpen()) {
235                throw new ClosedChannelException();
236            }
237            if (!buffer.hasArray()) {
238                throw new IllegalArgumentException("direct buffer somehow written to BufferAtATimeOutputChannel");
239            }
240
241            try {
242                int pos = buffer.position();
243                int len = buffer.limit() - pos;
244                out.write(buffer.array(), buffer.arrayOffset() + pos, len);
245                buffer.position(buffer.limit());
246                return len;
247            } catch (IOException e) {
248                try {
249                    close();
250                } catch (IOException ignored) { //NOSONAR
251                }
252                throw e;
253            }
254        }
255
256        @Override
257        public boolean isOpen() {
258            return !closed.get();
259        }
260
261        @Override
262        public void close() throws IOException {
263            if (closed.compareAndSet(false, true)) {
264                out.close();
265            }
266        }
267
268    }
269
270
271}