1 /* 2 * Copyright (c) 2017-2018 sel-project 3 * 4 * Permission is hereby granted, free of charge, to any person obtaining a copy 5 * of this software and associated documentation files (the "Software"), to deal 6 * in the Software without restriction, including without limitation the rights 7 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell 8 * copies of the Software, and to permit persons to whom the Software is 9 * furnished to do so, subject to the following conditions: 10 * 11 * The above copyright notice and this permission notice shall be included in all 12 * copies or substantial portions of the Software. 13 * 14 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 15 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 16 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 17 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 18 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 19 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE 20 * SOFTWARE. 21 * 22 */ 23 /** 24 * Copyright: Copyright (c) 2017-2018 sel-project 25 * License: MIT 26 * Authors: Kripth 27 * Source: $(HTTP github.com/sel-project/sel-net/sel/net/stream.d, sel/net/stream.d) 28 */ 29 module sel.net.stream; 30 31 import core.atomic : atomicOp; 32 33 import std.algorithm : min; 34 import std.bitmanip : littleEndianToNative, nativeToLittleEndian, nativeToBigEndian, peek; 35 import std.conv : to; 36 import std.math : ceil; 37 import std.socket : Address, Socket; 38 39 /** 40 * Generic abstract stream. It stores a socket. 41 */ 42 class Stream { 43 44 /** 45 * Socket used for writing and reading data. 46 */ 47 public Socket socket; 48 protected ptrdiff_t last_recv = -1; 49 50 public this(Socket socket) { 51 this.socket = socket; 52 } 53 54 /** 55 * Sends bytes to the connected socket. 56 * Returns: the number of bytes sent. 57 */ 58 public abstract ptrdiff_t send(ubyte[] buffer); 59 60 /** 61 * Receives bytes from the connected socket. 62 * Returns: the received data or an empty array on failure. 63 */ 64 public abstract ubyte[] receive(); 65 66 /** 67 * Indicates the result of the last receive call performed 68 * on the connected socket. 69 */ 70 public pure nothrow @property @safe @nogc ptrdiff_t lastRecv() { 71 return this.last_recv; 72 } 73 74 } 75 76 /** 77 * Stream optimised for TCP connections. 78 * The given socket should be blocking. 79 */ 80 class TcpStream : Stream { 81 82 private ubyte[] buffer; 83 84 public this(Socket socket, size_t bufferSize=4096) { 85 super(socket); 86 this.buffer = new ubyte[bufferSize]; 87 } 88 89 /** 90 * Sends a full payload (even when it biggen than the send buffer) 91 * and only returns when it is sent or on failure. 92 * Returns: the number of bytes sent (if not payload.length, an error has occured). 93 */ 94 public override ptrdiff_t send(ubyte[] payload) { 95 size_t sent = 0; 96 while(sent < payload.length) { 97 auto s = this.socket.send(payload[sent..$]); 98 if(s <= 0) break; 99 sent += s; 100 } 101 return sent; 102 } 103 104 /** 105 * Receive a single stream of data until the receive buffer is empty 106 * or an error occurs. 107 * Returns: an array with the received data. 108 */ 109 public override ubyte[] receive() { 110 this.last_recv = this.socket.receive(buffer); 111 if(this.last_recv > 0) { 112 return this.buffer[0..this.last_recv].dup; 113 } else { 114 return []; 115 } 116 } 117 118 } 119 120 /** 121 * Stream optimised for UDP connections. 122 */ 123 class UdpStream : Stream { 124 125 private Address address; 126 private ubyte[] buffer; 127 128 public this(Socket socket, Address address=null, size_t bufferSize=1492) { 129 super(socket); 130 this.buffer = new ubyte[bufferSize]; 131 } 132 133 public override ptrdiff_t send(ubyte[] buffer) { 134 return this.socket.sendTo(buffer, this.address); 135 } 136 137 public ptrdiff_t sendTo(ubyte[] buffer, Address address) { 138 return this.socket.sendTo(buffer, address); 139 } 140 141 public override ubyte[] receive() { 142 this.last_recv = this.socket.receiveFrom(this.buffer, this.address); 143 if(this.last_recv > 0) { 144 return this.buffer[0..this.last_recv].dup; 145 } else { 146 return []; 147 } 148 } 149 150 public ubyte[] receiveFrom(ref Address address) { 151 this.last_recv = this.socket.receiveFrom(this.buffer, address); 152 if(this.last_recv > 0) { 153 return this.buffer[0..this.last_recv].dup; 154 } else { 155 return []; 156 } 157 } 158 159 } 160 161 class RaknetStream : Stream { 162 163 private Address address; 164 public immutable size_t mtu; 165 166 public bool acceptSplit = true; 167 private ubyte[][][ushort] splits; 168 private size_t[ushort] splitsCount; 169 170 private ubyte[] buffer; 171 172 private shared int send_count = -1; 173 private ushort split_id = 0; 174 175 private ubyte[][int] sent; 176 177 public this(Socket socket, Address address, size_t mtu) { 178 super(socket); 179 this.address = address; 180 this.mtu = mtu; 181 this.buffer = new ubyte[mtu + 128]; 182 } 183 184 public override ptrdiff_t send(ubyte[] _buffer) { 185 if(_buffer.length > this.mtu) { 186 size_t sent = 0; 187 immutable count = to!uint(ceil(_buffer.length.to!float / this.mtu)); 188 immutable sizes = to!uint(ceil(_buffer.length.to!float / count)); 189 foreach(order ; 0..count) { 190 immutable c = atomicOp!"+="(this.send_count, 1); 191 ubyte[] current = _buffer[order*sizes..min((order+1)*sizes, $)]; 192 ubyte[3] _count = nativeToLittleEndian(c)[0..3]; 193 ubyte[] buffer = [ubyte(140)]; 194 buffer ~= _count; 195 buffer ~= ubyte(64 | 16); // info 196 buffer ~= nativeToBigEndian(cast(ushort)(current.length * 8)); 197 buffer ~= _count; // message index 198 buffer ~= nativeToBigEndian(count); 199 buffer ~= nativeToBigEndian(this.split_id); 200 buffer ~= nativeToBigEndian(order); 201 buffer ~= current; 202 sent += this.socket.sendTo(buffer, this.address); 203 this.sent[c] = buffer; 204 } 205 this.split_id++; 206 return sent; 207 } else { 208 immutable c = atomicOp!"+="(this.send_count, 1); 209 ubyte[3] count = nativeToLittleEndian(c)[0..3]; 210 ubyte[] buffer = [ubyte(132)]; 211 buffer ~= count; 212 buffer ~= ubyte(64); // info 213 buffer ~= nativeToBigEndian(cast(ushort)(_buffer.length * 8)); 214 buffer ~= count; // message index 215 buffer ~= _buffer; 216 this.sent[c] = buffer; 217 return this.socket.sendTo(buffer, this.address); 218 } 219 } 220 221 public override ubyte[] receive() { 222 auto recv = this.socket.receiveFrom(this.buffer, this.address); 223 if(recv > 0) { 224 return this.handle(this.buffer[0..recv]); 225 } else { 226 return []; 227 } 228 } 229 230 public ubyte[] handle(ubyte[] buffer) { 231 if(buffer.length) { 232 switch(buffer[0]) { 233 case 192: 234 //writeln("ack: ", getAck(buffer[1..$])); 235 foreach(ack ; getAck(buffer[1..$])) { 236 this.sent.remove(ack); 237 } 238 //return receive(); 239 break; 240 case 160: 241 int[] nacks = getAck(buffer[1..$]); 242 size_t count = 0; 243 foreach(nack ; nacks) { 244 auto sent = nack in this.sent; 245 if(sent) { 246 this.socket.sendTo(*sent, this.address); 247 //if(++count == 32_000) break; 248 } 249 } 250 //writeln("sent ", nacks.length, " nacks"); 251 //return receive(); 252 break; 253 case 128:..case 143: 254 if(buffer.length > 7) { 255 ubyte[4] _count = buffer[1..4] ~ ubyte(0); 256 immutable count = littleEndianToNative!int(_count); 257 // send ack 258 // id, length (2), unique, from (3), to (3) 259 this.socket.sendTo([ubyte(192), ubyte(0), ubyte(1), ubyte(true)] ~ buffer[1..4], this.address); 260 // handle packet 261 size_t index = 4; 262 immutable info = buffer[index++]; 263 index += 2; // length / 8 264 if((info & 0x7F) >= 64) { 265 index += 3; // message index 266 if((info & 0x7F) >= 96) { 267 index += 3; // order index 268 index += 1; // order channel 269 } 270 } 271 if(info & 0x10) { 272 if(index + 10 < buffer.length && this.acceptSplit) { 273 return this.handleSplit(peek!uint(buffer, &index), peek!ushort(buffer, &index), peek!uint(buffer, &index), buffer[index..$]); 274 } 275 } else { 276 return buffer[index..$]; 277 } 278 } 279 break; 280 default: 281 break; 282 } 283 } 284 return []; 285 } 286 287 private ubyte[] handleSplit(uint count, ushort id, uint order, ubyte[] buffer) { 288 auto split = id in this.splits; 289 if(split is null) { 290 //TODO limit count 291 this.splits[id].length = count; 292 split = id in this.splits; 293 } 294 if(count == (*split).length && order < count) { 295 (*split)[order] = buffer; 296 if(++this.splitsCount[id] == count) { 297 ubyte[] ret; 298 foreach(b ; *split) { 299 ret ~= b.dup; 300 } 301 this.splits.remove(id); 302 this.splitsCount.remove(id); 303 return ret; 304 } 305 } 306 return []; 307 } 308 309 private static int readTriad(ubyte[] data) { 310 ubyte[4] bytes = data ~ ubyte(0); 311 return littleEndianToNative!int(bytes); 312 } 313 314 private static int[] getAck(ubyte[] buffer) { 315 int[] ret; 316 size_t index = 1; 317 foreach(i ; 0..buffer[index++]) { 318 if(buffer[index++]) { 319 ret ~= readTriad(buffer[index..index+=3]); 320 } else { 321 foreach(num ; readTriad(buffer[index..index+=3])..readTriad(buffer[index..index+=3])+1) { 322 ret ~= num; 323 } 324 } 325 } 326 return ret; 327 } 328 329 }