1 /*
2  * Copyright (c) 2017-2018 sel-project
3  *
4  * Permission is hereby granted, free of charge, to any person obtaining a copy
5  * of this software and associated documentation files (the "Software"), to deal
6  * in the Software without restriction, including without limitation the rights
7  * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
8  * copies of the Software, and to permit persons to whom the Software is
9  * furnished to do so, subject to the following conditions:
10  *
11  * The above copyright notice and this permission notice shall be included in all
12  * copies or substantial portions of the Software.
13  *
14  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
19  * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
20  * SOFTWARE.
21  *
22  */
23 /**
24  * Copyright: Copyright (c) 2017-2018 sel-project
25  * License: MIT
26  * Authors: Kripth
27  * Source: $(HTTP github.com/sel-project/sel-net/sel/net/stream.d, sel/net/stream.d)
28  */
29 module sel.net.stream;
30 
31 import core.atomic : atomicOp;
32 
33 import std.algorithm : min;
34 import std.bitmanip : littleEndianToNative, nativeToLittleEndian, nativeToBigEndian, peek;
35 import std.conv : to;
36 import std.math : ceil;
37 import std.socket : Address, Socket;
38 
39 /**
40  * Generic abstract stream. It stores a socket.
41  */
42 class Stream {
43 
44 	/**
45 	 * Socket used for writing and reading data.
46 	 */
47 	public Socket socket;
48 	protected ptrdiff_t last_recv = -1;
49 
50 	public this(Socket socket) {
51 		this.socket = socket;
52 	}
53 
54 	/**
55 	 * Sends bytes to the connected socket.
56 	 * Returns: the number of bytes sent.
57 	 */
58 	public abstract ptrdiff_t send(ubyte[] buffer);
59 
60 	/**
61 	 * Receives bytes from the connected socket.
62 	 * Returns: the received data or an empty array on failure.
63 	 */
64 	public abstract ubyte[] receive();
65 
66 	/**
67 	 * Indicates the result of the last receive call performed
68 	 * on the connected socket.
69 	 */
70 	public pure nothrow @property @safe @nogc ptrdiff_t lastRecv() {
71 		return this.last_recv;
72 	}
73 	
74 }
75 
76 /**
77  * Stream optimised for TCP connections.
78  * The given socket should be blocking.
79  */
80 class TcpStream : Stream {
81 
82 	private ubyte[] buffer;
83 
84 	public this(Socket socket, size_t bufferSize=4096) {
85 		super(socket);
86 		this.buffer = new ubyte[bufferSize];
87 	}
88 
89 	/**
90 	 * Sends a full payload (even when it biggen than the send buffer)
91 	 * and only returns when it is sent or on failure.
92 	 * Returns: the number of bytes sent (if not payload.length, an error has occured).
93 	 */
94 	public override ptrdiff_t send(ubyte[] payload) {
95 		size_t sent = 0;
96 		while(sent < payload.length) {
97 			auto s = this.socket.send(payload[sent..$]);
98 			if(s <= 0) break;
99 			sent += s;
100 		}
101 		return sent;
102 	}
103 
104 	/**
105 	 * Receive a single stream of data until the receive buffer is empty
106 	 * or an error occurs.
107 	 * Returns: an array with the received data.
108 	 */
109 	public override ubyte[] receive() {
110 		this.last_recv = this.socket.receive(buffer);
111 		if(this.last_recv > 0) {
112 			return this.buffer[0..this.last_recv].dup;
113 		} else {
114 			return [];
115 		}
116 	}
117 
118 }
119 
120 /**
121  * Stream optimised for UDP connections.
122  */
123 class UdpStream : Stream {
124 	
125 	private Address address;
126 	private ubyte[] buffer;
127 	
128 	public this(Socket socket, Address address=null, size_t bufferSize=1492) {
129 		super(socket);
130 		this.buffer = new ubyte[bufferSize];
131 	}
132 	
133 	public override ptrdiff_t send(ubyte[] buffer) {
134 		return this.socket.sendTo(buffer, this.address);
135 	}
136 	
137 	public ptrdiff_t sendTo(ubyte[] buffer, Address address) {
138 		return this.socket.sendTo(buffer, address);
139 	}
140 	
141 	public override ubyte[] receive() {
142 		this.last_recv = this.socket.receiveFrom(this.buffer, this.address);
143 		if(this.last_recv > 0) {
144 			return this.buffer[0..this.last_recv].dup;
145 		} else {
146 			return [];
147 		}
148 	}
149 	
150 	public ubyte[] receiveFrom(ref Address address) {
151 		this.last_recv = this.socket.receiveFrom(this.buffer, address);
152 		if(this.last_recv > 0) {
153 			return this.buffer[0..this.last_recv].dup;
154 		} else {
155 			return [];
156 		}
157 	}
158 	
159 }
160 
161 class RaknetStream : Stream {
162 	
163 	private Address address;
164 	public immutable size_t mtu;
165 	
166 	public bool acceptSplit = true;
167 	private ubyte[][][ushort] splits;
168 	private size_t[ushort] splitsCount;
169 	
170 	private ubyte[] buffer;
171 	
172 	private shared int send_count = -1;
173 	private ushort split_id = 0;
174 	
175 	private ubyte[][int] sent;
176 	
177 	public this(Socket socket, Address address, size_t mtu) {
178 		super(socket);
179 		this.address = address;
180 		this.mtu = mtu;
181 		this.buffer = new ubyte[mtu + 128];
182 	}
183 	
184 	public override ptrdiff_t send(ubyte[] _buffer) {
185 		if(_buffer.length > this.mtu) {
186 			size_t sent = 0;
187 			immutable count = to!uint(ceil(_buffer.length.to!float / this.mtu));
188 			immutable sizes = to!uint(ceil(_buffer.length.to!float / count));
189 			foreach(order ; 0..count) {
190 				immutable c = atomicOp!"+="(this.send_count, 1);
191 				ubyte[] current = _buffer[order*sizes..min((order+1)*sizes, $)];
192 				ubyte[3] _count = nativeToLittleEndian(c)[0..3];
193 				ubyte[] buffer = [ubyte(140)];
194 				buffer ~= _count;
195 				buffer ~= ubyte(64 | 16); // info
196 				buffer ~= nativeToBigEndian(cast(ushort)(current.length * 8));
197 				buffer ~= _count; // message index
198 				buffer ~= nativeToBigEndian(count);
199 				buffer ~= nativeToBigEndian(this.split_id);
200 				buffer ~= nativeToBigEndian(order);
201 				buffer ~= current;
202 				sent += this.socket.sendTo(buffer, this.address);
203 				this.sent[c] = buffer;
204 			}
205 			this.split_id++;
206 			return sent;
207 		} else {
208 			immutable c = atomicOp!"+="(this.send_count, 1);
209 			ubyte[3] count = nativeToLittleEndian(c)[0..3];
210 			ubyte[] buffer = [ubyte(132)];
211 			buffer ~= count;
212 			buffer ~= ubyte(64); // info
213 			buffer ~= nativeToBigEndian(cast(ushort)(_buffer.length * 8));
214 			buffer ~= count; // message index
215 			buffer ~= _buffer;
216 			this.sent[c] = buffer;
217 			return this.socket.sendTo(buffer, this.address);
218 		}
219 	}
220 	
221 	public override ubyte[] receive() {
222 		auto recv = this.socket.receiveFrom(this.buffer, this.address);
223 		if(recv > 0) {
224 			return this.handle(this.buffer[0..recv]);
225 		} else {
226 			return [];
227 		}
228 	}
229 	
230 	public ubyte[] handle(ubyte[] buffer) {
231 		if(buffer.length) {
232 			switch(buffer[0]) {
233 				case 192:
234 					//writeln("ack: ", getAck(buffer[1..$]));
235 					foreach(ack ; getAck(buffer[1..$])) {
236 						this.sent.remove(ack);
237 					}
238 					//return receive();
239 					break;
240 				case 160:
241 					int[] nacks = getAck(buffer[1..$]);
242 					size_t count = 0;
243 					foreach(nack ; nacks) {
244 						auto sent = nack in this.sent;
245 						if(sent) {
246 							this.socket.sendTo(*sent, this.address);
247 							//if(++count == 32_000) break;
248 						}
249 					}
250 					//writeln("sent ", nacks.length, " nacks");
251 					//return receive();
252 					break;
253 				case 128:..case 143:
254 					if(buffer.length > 7) {
255 						ubyte[4] _count = buffer[1..4] ~ ubyte(0);
256 						immutable count = littleEndianToNative!int(_count);
257 						// send ack
258 						// id, length (2), unique, from (3), to (3)
259 						this.socket.sendTo([ubyte(192), ubyte(0), ubyte(1), ubyte(true)] ~ buffer[1..4], this.address);
260 						// handle packet
261 						size_t index = 4;
262 						immutable info = buffer[index++];
263 						index += 2; // length / 8
264 						if((info & 0x7F) >= 64) {
265 							index += 3; // message index
266 							if((info & 0x7F) >= 96) {
267 								index += 3; // order index
268 								index += 1; // order channel
269 							}
270 						}
271 						if(info & 0x10) {
272 							if(index + 10 < buffer.length && this.acceptSplit) {
273 								return this.handleSplit(peek!uint(buffer, &index), peek!ushort(buffer, &index), peek!uint(buffer, &index), buffer[index..$]);
274 							}
275 						} else {
276 							return buffer[index..$];
277 						}
278 					}
279 					break;
280 				default:
281 					break;
282 			}
283 		}
284 		return [];
285 	}
286 	
287 	private ubyte[] handleSplit(uint count, ushort id, uint order, ubyte[] buffer) {
288 		auto split = id in this.splits;
289 		if(split is null) {
290 			//TODO limit count
291 			this.splits[id].length = count;
292 			split = id in this.splits;
293 		}
294 		if(count == (*split).length && order < count) {
295 			(*split)[order] = buffer;
296 			if(++this.splitsCount[id] == count) {
297 				ubyte[] ret;
298 				foreach(b ; *split) {
299 					ret ~= b.dup;
300 				}
301 				this.splits.remove(id);
302 				this.splitsCount.remove(id);
303 				return ret;
304 			}
305 		}
306 		return [];
307 	}
308 	
309 	private static int readTriad(ubyte[] data) {
310 		ubyte[4] bytes = data ~ ubyte(0);
311 		return littleEndianToNative!int(bytes);
312 	}
313 	
314 	private static int[] getAck(ubyte[] buffer) {
315 		int[] ret;
316 		size_t index = 1;
317 		foreach(i ; 0..buffer[index++]) {
318 			if(buffer[index++]) {
319 				ret ~= readTriad(buffer[index..index+=3]);
320 			} else {
321 				foreach(num ; readTriad(buffer[index..index+=3])..readTriad(buffer[index..index+=3])+1) {
322 					ret ~= num;
323 				}
324 			}
325 		}
326 		return ret;
327 	}
328 	
329 }