Skip to content

missing messages #35

@chrisdenley

Description

@chrisdenley

I seem to be losing messages from the websocket stream intermittently. I'm not sure if the messages are getting dropped by AnyEvent::WebSocket::Client, the socket, or maybe the server just isn't sending them. Almost a couple hundred at a time.

I haven't re-created the issue using this simple test script yet (very intermittent), but it should effectively be doing the same thing as far as the web socket is concerned.

Is there a limit to how much websocket data can be buffered, and what happens when that limit is exceeded? Any idea how I could troubleshoot this issue? Since it uses SSL, I can't check a packet capture to see if the missing messages were actually sent.

#!/usr/bin/perl
use strict;
use warnings;
use AnyEvent::WebSocket::Client;
use JSON::XS;

my @prods = qw(BTC-USD LTC-USD ETH-USD);
my $wssurl = "wss://ws-feed.gdax.com/";
my %seq = ();
my %lastseq = ();
my $connection;
my $cv;

sub shutdown {
	if(defined $connection) {
		$connection->close();
	}
	else {
		exit(0);
	}
}

$SIG{'INT'} = \&shutdown;
$SIG{'TERM'} = \&shutdown;

my $sock = AnyEvent::WebSocket::Client->new;
$sock->connect($wssurl)->cb(sub {
	$connection = eval { shift->recv };
	my $subscribe = {
		type => 'subscribe',
		product_ids => \@prods,
		channels => ['full']
	};
	my $json = encode_json($subscribe);
	print $json."\n";
	$connection->send($json);
	$connection->on(each_message => \&load_msg);
	$connection->on(finish => sub {
		print "CONNECTION CLOSED\n";
		$cv->croak();
	});
});
$cv = AnyEvent->condvar;
my $idle = AnyEvent->idle(cb => \&process);
$cv->recv;

sub process {
	foreach my $prod(@prods) {
		next unless(defined $seq{$prod}); # nothing to process yet
		next if(defined $lastseq{$prod} and $lastseq{$prod} == $seq{$prod}); # nothing new to process
		$lastseq{$prod} = $seq{$prod};
		print "processing new data for $prod\n";
		sleep 1;
	}
}

sub load_msg {
    my ($conn,$msg) = @_;
    my $data = decode_json($msg->body);
    my $type = $$data{'type'};
	my $aseq = $$data{'sequence'};
	my $aprod = $$data{'product_id'};
	if(defined $aseq and defined $aprod) {
		print "$aprod $aseq\n";
		if(defined $seq{$aprod}) {
			if($aseq != $seq{$aprod} + 1) {
				die("Missed $aprod message(s), went from ".$seq{$aprod}." to $aseq");
			}
		}
		$seq{$aprod} = $aseq;
	}
}

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions