1 /* 2 * Copyright (c) 2017-2018 sel-project 3 * 4 * Permission is hereby granted, free of charge, to any person obtaining a copy 5 * of this software and associated documentation files (the "Software"), to deal 6 * in the Software without restriction, including without limitation the rights 7 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell 8 * copies of the Software, and to permit persons to whom the Software is 9 * furnished to do so, subject to the following conditions: 10 * 11 * The above copyright notice and this permission notice shall be included in all 12 * copies or substantial portions of the Software. 13 * 14 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 15 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 16 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 17 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 18 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 19 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE 20 * SOFTWARE. 21 * 22 */ 23 /** 24 * Copyright: Copyright (c) 2017-2018 sel-project 25 * License: MIT 26 * Authors: Kripth 27 * Source: $(HTTP github.com/sel-project/sel-net/sel/net/modifiers.d, sel/net/modifiers.d) 28 */ 29 module sel.net.modifiers; 30 31 import std.bitmanip : _write = write, _read = read; 32 import std.conv : to; 33 import std.socket : Socket, Address; 34 import std.system : Endian; 35 import std.traits : isNumeric, isIntegral, Parameters; 36 import std.zlib : Compress, UnCompress; 37 38 import sel.net.stream : Stream; 39 40 abstract class ModifierStream : Stream { 41 42 public Stream stream; 43 44 public this(Stream stream) { 45 super(stream.socket); 46 this.stream = stream; 47 } 48 49 public override ptrdiff_t send(ubyte[] buffer) { 50 return this.stream.send(buffer); 51 } 52 53 public override ubyte[] receive() { 54 return this.stream.receive(); 55 } 56 57 public override pure nothrow @property @safe @nogc ptrdiff_t lastRecv() { 58 return this.stream.lastRecv(); 59 } 60 61 } 62 63 class PaddedStream(size_t paddingIndex) : ModifierStream { 64 65 private ubyte[] padding; 66 67 public this(Stream stream, ubyte[] padding) { 68 super(stream); 69 } 70 71 public override ptrdiff_t send(ubyte[] payload) { 72 static if(paddingIndex == 0) { 73 payload = this.padding ~ payload; 74 } else { 75 payload = payload[0..paddingIndex] ~ this.padding ~ payload[paddingIndex..$]; 76 } 77 return super.send(payload); 78 } 79 80 public override ubyte[] receive() { 81 ubyte[] payload = super.receive(); 82 if(payload.length >= this.padding.length + paddingIndex) { 83 static if(paddingIndex == 0) { 84 return payload[this.padding.length..$]; 85 } else { 86 return payload[0..this.padding.length] ~ payload[this.padding.length..$]; 87 } 88 } 89 return []; 90 } 91 92 } 93 94 class LengthPrefixedStream(T, Endian endianness=Endian.bigEndian) : ModifierStream if(isNumeric!T || (is(typeof(T.encode)) && isIntegral!(Parameters!(T.encode)[0]))) { 95 96 static if(isNumeric!T) { 97 enum requiredSize = T.sizeof; 98 } else { 99 enum requiredSize = 1; 100 } 101 102 public size_t maxLength; 103 104 private ubyte[] next; 105 private size_t nextLength = 0; 106 107 public this(Stream stream, size_t maxLength=size_t.max) { 108 super(stream); 109 this.maxLength = maxLength; 110 } 111 112 /** 113 * Sends a buffer prefixing it with its length. 114 * Returns: the number of bytes sent 115 */ 116 public override ptrdiff_t send(ubyte[] payload) { 117 static if(isNumeric!T) { 118 immutable length = payload.length.to!T; 119 payload = new ubyte[requiredSize] ~ payload; 120 _write!(T, endianness)(payload, length, 0); 121 } else { 122 payload = T.encode(payload.length.to!(Parameters!(T.encode)[0])) ~ payload; 123 } 124 return super.send(payload); 125 } 126 127 /** 128 * Returns: an array of bytes as indicated by the length or an empty array on failure or when the indicated length exceeds the max length 129 */ 130 public override ubyte[] receive() { 131 return this.receiveImpl(); 132 } 133 134 private ubyte[] receiveImpl() { 135 if(this.nextLength == 0) { 136 // read length of the packet 137 while(this.next.length < requiredSize) { 138 if(!this.read()) return []; 139 } 140 static if(isNumeric!T) { 141 this.nextLength = _read!(T, endianness)(this.next); 142 } else { 143 this.nextLength = T.fromBuffer(this.next); 144 } 145 if(this.nextLength == 0 || this.nextLength > this.maxLength) { 146 // valid connection but unacceptable length 147 this.nextLength = 0; 148 return []; 149 } else { 150 return this.receiveImpl(); 151 } 152 } else { 153 // read the packet with the given length 154 while(this.next.length < this.nextLength) { 155 if(!this.read()) return []; 156 } 157 ubyte[] ret = this.next[0..this.nextLength]; 158 this.next = this.next[this.nextLength..$]; 159 this.nextLength = 0; 160 return ret; 161 } 162 } 163 164 /* 165 * Returns: true if some data has been received, false if the connection has been closed or timed out 166 */ 167 private bool read() { 168 ubyte[] recv = super.receive(); 169 if(this.lastRecv > 0) { 170 this.next ~= recv; 171 return true; 172 } else { 173 return false; 174 } 175 } 176 177 } 178 179 class CompressedStream(T) : ModifierStream { 180 181 private immutable size_t thresold; 182 183 public this(Stream stream, size_t thresold) { 184 super(stream); 185 this.thresold = thresold; 186 } 187 188 public override ptrdiff_t send(ubyte[] buffer) { 189 if(buffer.length >= this.thresold) { 190 auto compress = new Compress(); 191 auto data = compress.compress(buffer); 192 data ~= compress.flush(); 193 buffer = T.encode(buffer.length.to!uint) ~ cast(ubyte[])data; //TODO more types 194 } else { 195 buffer = ubyte.init ~ buffer; 196 } 197 return super.send(buffer); 198 } 199 200 public override ubyte[] receive() { 201 ubyte[] buffer = super.receive(); 202 uint length = T.fromBuffer(buffer); 203 if(length != 0) { 204 // compressed 205 auto uncompress = new UnCompress(length); 206 buffer = cast(ubyte[])uncompress.uncompress(buffer.dup); 207 buffer ~= cast(ubyte[])uncompress.flush(); 208 } 209 return buffer; 210 } 211 212 }