Skip to content

Commit dfd1b98

Browse files
committed
[Stream] Move most of Socket\Connection logic to Stream\Stream, generic ftw
1 parent 4503a95 commit dfd1b98

File tree

1 file changed

+10
-112
lines changed

1 file changed

+10
-112
lines changed

Connection.php

Lines changed: 10 additions & 112 deletions
Original file line numberDiff line numberDiff line change
@@ -6,134 +6,32 @@
66
use React\EventLoop\LoopInterface;
77
use React\Stream\WritableStream;
88
use React\Stream\Buffer;
9+
use React\Stream\Stream;
910
use React\Stream\Util;
1011

11-
class Connection extends EventEmitter implements ConnectionInterface
12+
class Connection extends Stream implements ConnectionInterface
1213
{
13-
public $bufferSize = 4096;
14-
public $socket;
15-
private $readable = true;
16-
private $writable = true;
17-
private $closing = false;
18-
private $loop;
19-
private $buffer;
20-
21-
public function __construct($socket, LoopInterface $loop)
22-
{
23-
$this->socket = $socket;
24-
$this->loop = $loop;
25-
$this->buffer = new Buffer($this->socket, $this->loop);
26-
27-
$that = $this;
28-
29-
$this->buffer->on('error', function ($error) use ($that) {
30-
$that->emit('error', array($error, $that));
31-
$that->close();
32-
});
33-
34-
$this->buffer->on('drain', function () use ($that) {
35-
$that->emit('drain');
36-
});
37-
38-
$this->resume();
39-
}
40-
41-
public function isReadable()
42-
{
43-
return $this->readable;
44-
}
45-
46-
public function isWritable()
47-
{
48-
return $this->writable;
49-
}
50-
51-
public function pause()
52-
{
53-
$this->loop->removeReadStream($this->socket);
54-
}
55-
56-
public function resume()
57-
{
58-
$this->loop->addReadStream($this->socket, array($this, 'handleData'));
59-
}
60-
61-
public function write($data)
14+
public function handleData($stream)
6215
{
63-
if (!$this->writable) {
64-
return;
65-
}
66-
67-
return $this->buffer->write($data);
68-
}
69-
70-
public function close()
71-
{
72-
if (!$this->writable && !$this->closing) {
73-
return;
74-
}
75-
76-
$this->closing = false;
77-
78-
$this->readable = false;
79-
$this->writable = false;
80-
81-
$this->emit('end', array($this));
82-
$this->emit('close', array($this));
83-
$this->loop->removeStream($this->socket);
84-
$this->buffer->removeAllListeners();
85-
$this->removeAllListeners();
86-
if (is_resource($this->socket)) {
87-
stream_socket_shutdown($this->socket, STREAM_SHUT_RDWR);
88-
fclose($this->socket);
89-
}
90-
}
91-
92-
public function end($data = null)
93-
{
94-
if (!$this->writable) {
95-
return;
96-
}
97-
98-
$this->closing = true;
99-
100-
$this->readable = false;
101-
$this->writable = false;
102-
103-
$that = $this;
104-
105-
$this->buffer->on('close', function () use ($that) {
106-
$that->close();
107-
});
108-
109-
$this->buffer->end($data);
110-
}
111-
112-
public function pipe(WritableStream $dest, array $options = array())
113-
{
114-
Util::pipe($this, $dest, $options);
115-
116-
return $dest;
117-
}
118-
119-
public function handleData($socket)
120-
{
121-
$data = stream_socket_recvfrom($socket, $this->bufferSize);
16+
$data = stream_socket_recvfrom($stream, $this->bufferSize);
12217
if ('' === $data || false === $data) {
12318
$this->end();
12419
} else {
12520
$this->emit('data', array($data, $this));
12621
}
12722
}
12823

129-
public function getBuffer()
24+
public function handleClose()
13025
{
131-
return $this->buffer;
26+
if (is_resource($this->stream)) {
27+
stream_socket_shutdown($this->stream, STREAM_SHUT_RDWR);
28+
fclose($this->stream);
29+
}
13230
}
13331

13432
public function getRemoteAddress()
13533
{
136-
return $this->parseAddress(stream_socket_get_name($this->socket, true));
34+
return $this->parseAddress(stream_socket_get_name($this->stream, true));
13735
}
13836

13937
private function parseAddress($address)

0 commit comments

Comments
 (0)