"""A library to store common functions and protocol definitions"""
from __future__ import absolute_import
from __future__ import print_function
from __future__ import with_statement
import inspect
from hashlib import (sha256, sha384)
from itertools import chain
from logging import (getLogger, DEBUG)
from socket import (SHUT_RDWR, error as SocketException, timeout as
TimeoutException)
from sys import version_info
from threading import (Lock, Thread, current_thread)
from traceback import format_exc
from uuid import uuid4
from base58 import (b58encode, b58encode_int)
from pyee import EventEmitter
from typing import (cast, Any, Callable, Dict, Iterable, List, NamedTuple,
Sequence, Tuple, Union)
from . import flags
from .messages import (compression, InternalMessage, MsgPackable)
from .utils import (getUTC, intersect, get_lan_ip, get_socket, inherit_doc,
log_entry, unpack_value)
protocol_version = "0.7"
node_policy_version = "870"
version = '.'.join((protocol_version, node_policy_version))
plock = Lock()
user_salt = str(uuid4()).encode()
[docs]class Protocol(
NamedTuple("_Protocol", [('subnet', str), ('encryption', str)])):
"""Defines service variables so that you can reject connections looking
for a different service
Attributes:
subnet: The subnet flag this Protocol uses
encryption: The encryption method this Protocol uses
id: The SHA-256 based ID of this Protocol
"""
__slots__ = ()
@property
def id(self):
# type: (Protocol) -> str
"""The SHA-256-based ID of the Protocol"""
h = sha256(''.join(str(x) for x in self).encode())
h.update(protocol_version.encode())
return b58encode_int(int(h.hexdigest(), 16))
default_protocol = Protocol('', "Plaintext") # SSL")
[docs]class BaseConnection(object):
"""The base class for a connection"""
__slots__ = ('sock', 'server', 'outgoing', 'buffer', 'id', 'time', 'addr',
'compression', 'last_sent', 'expected', 'active')
@log_entry('py2p.base.BaseConnection.__init__', DEBUG)
def __init__(self, sock, server, outgoing=False):
# type: (BaseConnection, Any, BaseSocket, bool) -> None
"""Sets up a connection to another peer-to-peer socket
Args:
sock: The connected socket object
server: A reference to your peer-to-peer socket
outgoing: Whether this connection is outgoing (default: False)
"""
self.sock = sock
self.server = server
self.outgoing = outgoing
self.buffer = bytearray()
self.id = None # type: Union[None, bytes]
self.time = getUTC()
self.addr = None # type: Union[None, Tuple[str, int]]
self.compression = [] # type: List[int]
self.last_sent = () # type: Tuple[MsgPackable, ...]
self.expected = 4
self.active = False
[docs] def send_InternalMessage(
self, # type: BaseConnection
msg # type: InternalMessage
): # type: (...) -> Union[InternalMessage, None]
"""Sends a preconstructed message
Args:
msg: The :py:class:`~py2p.base.IntenalMessage` you wish to send
Returns:
the :py:class:`~py2p.base.IntenalMessage` object you just sent, or
``None`` if the sending was unsuccessful
"""
msg.compression = self.compression # type: ignore
if msg.msg_type in (flags.whisper, flags.broadcast):
self.last_sent = msg.payload
self.__print__("Sending %s to %s" % (msg.packets, self), level=4)
if msg.compression_used:
self.__print__(
"Compressing with %s" % repr(msg.compression_used), level=4)
try:
self.sock.send(msg.string)
return msg
except (IOError, SocketException) as e: # pragma: no cover
self.server.daemon.exceptions.append(format_exc())
self.server.disconnect(self)
return None
[docs] def send(
self, # type: BaseConnection
msg_type, # type: MsgPackable
*args, # type: MsgPackable
**kargs # type: Union[bytes, int]
): # type: (...) -> InternalMessage
"""Sends a message through its connection.
Args:
msg_type: Message type, corresponds to the header in a
:py:class:`~py2p.base.InternalMessage` object
*args: A list of bytes-like objects, which correspond to the
packets to send to you
**kargs: There are two available keywords:
id: The ID this message should appear to be sent from
(default: your ID)
time: The time this message should appear to be sent from
(default: now in UTC)
Returns:
the :py:class:`~py2p.base.IntenalMessage` object you just sent, or
``None`` if the sending was unsuccessful
"""
# Latter is returned if key not found
id = cast(bytes, kargs.get('id', self.server.id))
time = cast(int, kargs.get('time') or getUTC())
# Begin real method
msg = InternalMessage(
msg_type, id, args, self.compression, timestamp=time)
return self.send_InternalMessage(msg)
@property
def protocol(self):
# type: (BaseConnection) -> Protocol
"""Returns server.protocol"""
return self.server.protocol
[docs] def collect_incoming_data(self, data):
# type: (BaseConnection, Union[bytes, bytearray]) -> bool
"""Collects incoming data
Args:
data: The most recently received :py:class:`bytes`
Returns:
``True`` if the data collection was successful, ``False`` if the
connection was closed
"""
if not bool(data):
try:
self.sock.shutdown(SHUT_RDWR)
except:
pass
return False
self.buffer.extend(data)
self.time = getUTC()
if not self.active and self.find_terminator():
self.__print__(
self.buffer, self.expected, self.find_terminator(), level=4)
self.expected = unpack_value(bytes(self.buffer[:4])) + 4
self.active = True
return True
[docs] def find_terminator(self):
# type: (BaseConnection) -> bool
"""Returns whether the defined return sequences is found"""
return len(self.buffer) >= self.expected
[docs] def found_terminator(self):
# type: (BaseConnection) -> InternalMessage
"""Processes received messages"""
raw_msg, self.buffer = bytes(
self.buffer[:self.expected]), self.buffer[self.expected:]
self.__print__("Received: %s" % repr(raw_msg), level=6)
self.active = len(self.buffer) > 4
if self.active:
self.expected = unpack_value(bytes(self.buffer[:4])) + 4
else:
self.expected = 4
msg = InternalMessage.feed_string(raw_msg, False, self.compression)
return msg
[docs] def handle_renegotiate(self, packets):
# type: (BaseConnection, Sequence[MsgPackable]) -> bool
"""The handler for connection renegotiations
This is to deal with connection maintenance. For instance, it could
be that a compression method fails to decode on the other end, and a
node will need to renegotiate which methods it is using. Hence the
name of the flag associated with it, "renegotiate".
Args:
packets: A :py:class:`tuple` containing the packets received
in this message
Returns:
``True`` if an action was taken, ``False`` if not
"""
if packets[0] == flags.renegotiate:
if packets[3] == flags.compression:
encoded_methods = packets[4]
respond = (self.compression != encoded_methods)
self.compression = list(cast(Iterable[int], encoded_methods))
self.__print__(
"Compression methods changed to: %s" %
repr(self.compression),
level=2)
if respond:
self.send(flags.renegotiate, flags.compression,
cast(Tuple[int, ...],
intersect(compression, self.compression)))
return True
elif packets[3] == flags.resend:
self.send(*self.last_sent)
return True
return False
[docs] def fileno(self):
# type: (BaseConnection) -> int
"""Mirror for the fileno() method of the connection's
underlying socket
"""
return self.sock.fileno()
def __print__(self, *args, **kargs):
# type: (BaseConnection, *Any, **int) -> None
"""Private method to print if level is <= self.server.debug_level
Args:
*args: Each argument you wish to feed to the print method
**kargs: One keyword is used here: level, which defines the
lowest value of self.server.debug_level at which
the message will be printed
"""
self.server.__print__(*args, **kargs)
[docs]class BaseDaemon(object):
"""The base class for a daemon"""
__slots__ = ('server', 'sock', 'exceptions', 'alive', '_logger',
'main_thread', 'daemon', 'conn_type')
@log_entry('py2p.base.BaseDaemon.__init__', DEBUG)
def __init__(self, addr, port, server):
# type: (Any, str, int, BaseSocket) -> None
"""Sets up a daemon process for your peer-to-peer socket
Args:
addr: The address you wish to bind to
port: The port you wish to bind to
server: A reference to the peer-to-peer socket
Raises:
socket.error: The address you wanted is already in use
ValueError: If your peer-to-peer socket is set up with an
unknown encryption method
"""
self.server = server
self.sock = get_socket(self.protocol, True)
self.sock.bind((addr, port))
self.sock.listen(5)
self.sock.settimeout(0.1)
self.exceptions = [] # type: List[str]
self.alive = True
self._logger = getLogger(
'{}.{}.{}'.format(self.__class__.__module__,
self.__class__.__name__, self.server.id))
self.main_thread = current_thread()
self.daemon = Thread(target=self.mainloop)
self.daemon.start()
@property
def protocol(self):
# type: (BaseDaemon) -> Protocol
"""Returns server.protocol"""
return self.server.protocol
[docs] def kill_old_nodes(self, handler):
# type: (BaseDaemon, BaseConnection) -> None
"""Cleans out connections which never finish a message"""
if handler.active and handler.time < getUTC() - 60:
self.server.disconnect(handler)
[docs] def process_data(self, handler):
# type: (BaseDaemon, BaseConnection) -> None
"""Collects incoming data from nodes"""
try:
while not handler.find_terminator():
if not handler.collect_incoming_data(handler.sock.recv(1024)):
self.__print__(
"disconnecting node %s while in loop" % handler.id,
level=6)
self.server.disconnect(handler)
self.server.request_peers()
return
while handler.find_terminator():
handler.found_terminator()
except TimeoutException: # pragma: no cover
return # Shouldn't happen with select, but if it does...
except Exception as e:
if (isinstance(e, SocketException) and
e.args[0] in (9, 104, 10053, 10054, 10058)):
node_id = repr(handler.id or handler)
self.__print__(
"Node %s has disconnected from the network" % node_id,
level=1)
else:
self.__print__(
"There was an unhandled exception with peer id %s. This "
"peer is being disconnected, and the relevant exception "
"is added to the debug queue. If you'd like to report "
"this, please post a copy of your MeshSocket.status to "
"git.p2p.today/issues." % handler.id,
level=0)
self.__print__("This exception was: {}".format(e), level=1)
self.exceptions.append(format_exc())
self.server.disconnect(handler)
self.server.request_peers()
def __del__(self):
# type: (BaseDaemon) -> None
self.alive = False
try:
self.sock.shutdown(SHUT_RDWR)
except: # pragma: no cover
pass
@inherit_doc(BaseConnection.__print__)
def __print__(self, *args, **kargs):
# type: (BaseDaemon, *Any, **int) -> None
self.server.__print__(*args, **kargs)
[docs]class BaseSocket(EventEmitter, object):
"""
The base class for a peer-to-peer socket abstractor
.. inheritance-diagram:: py2p.base.BaseSocket
"""
__slots__ = ('protocol', 'debug_level', 'routing_table', 'awaiting_ids',
'out_addr', 'id', '_logger', '__handlers', '__closed')
@log_entry('py2p.base.BaseSocket.__init__', DEBUG)
def __init__(
self, # type: Any
addr, # type: str
port, # type: int
prot=default_protocol, # type: Protocol
out_addr=None, # type: Union[None, Tuple[str, int]]
debug_level=0 # type: int
): # type: (...) -> None
"""Initializes a peer to peer socket
Args:
addr: The address you wish to bind to (ie: "192.168.1.1")
port: The port you wish to bind to (ie: 44565)
prot: The protocol you wish to operate over, defined by a
:py:class:`py2p.base.Protocol` object
out_addr: Your outward facing address. Only needed if you're
connecting over the internet. If you use '0.0.0.0'
for the addr argument, this will automatically be
set to your LAN address.
debug_level: The verbosity you want this socket to use when
printing event data
Raises:
socket.error: The address you wanted could not be bound, or is
otherwise used
"""
object.__init__(self)
EventEmitter.__init__(self)
self.protocol = prot
self.debug_level = debug_level
self.routing_table = {} # type: Dict[bytes, BaseConnection]
# In format {ID: handler}
self.awaiting_ids = [] # type: List[BaseConnection]
# Connected, but not handshook yet
if out_addr: # Outward facing address, if you're port forwarding
self.out_addr = out_addr
elif addr == '0.0.0.0':
self.out_addr = get_lan_ip(), port
else:
self.out_addr = addr, port
info = (str(self.out_addr).encode(), prot.id.encode(), user_salt)
h = sha384(b''.join(info))
self.id = b58encode_int(int(h.hexdigest(), 16)).encode() # type: bytes
self._logger = getLogger('{}.{}.{}'.format(
self.__class__.__module__, self.__class__.__name__, self.id))
self.__handlers = [
] # type: List[Callable[[Message, BaseConnection], Union[bool, None]]]
self.__closed = False
[docs] def close(self):
# type: (BaseSocket) -> None
"""If the socket is not closed, close the socket
Raises:
RuntimeError: The socket was already closed
"""
if self.__closed:
raise RuntimeError("Already closed")
else:
self.daemon.alive = False
self.daemon.daemon.join()
self.debug_level = 0
try:
self.daemon.sock.shutdown(SHUT_RDWR)
except:
pass
for conn in chain(
tuple(self.routing_table.values()), self.awaiting_ids):
self.disconnect(conn)
self.__closed = True
if version_info >= (3, ):
def register_handler(
self, # type: BaseSocket
method # type: Callable[..., Union[bool, None]]
): # type: (...) -> None
"""Register a handler for incoming method.
Args:
method: A function with two given arguments. Its signature
should be of the form ``handler(msg, handler)``,
where msg is a :py:class:`py2p.base.Message`
object, and handler is a
:py:class:`py2p.base.BaseConnection` object. It
should return ``True`` if it performed an action,
to reduce the number of handlers checked.
Raises:
ValueError: If the method signature doesn't parse correctly
"""
args = inspect.signature(method)
if (len(args.parameters) !=
(3 if args.parameters.get('self') else 2)):
raise ValueError(
"This method must contain exactly two arguments "
"(or three if first is self)")
self.__handlers.append(method)
else:
[docs] def register_handler(
self, # type: BaseSocket
method # type: Callable[..., Union[bool, None]]
): # type: (...) -> None
"""Register a handler for incoming method.
Args:
method: A function with two given arguments. Its signature
should be of the form ``handler(msg, handler)``,
where msg is a :py:class:`py2p.base.Message`
object, and handler is a
:py:class:`py2p.base.BaseConnection` object. It
should return ``True`` if it performed an action,
to reduce the number of handlers checked.
Raises:
ValueError: If the method signature doesn't parse correctly
"""
args = inspect.getargspec(method)
if (args[1:] != (None, None, None) or
len(args[0]) != (3 if args[0][0] == 'self' else 2)):
raise ValueError(
"This method must contain exactly two arguments "
"(or three if first is self)")
self.__handlers.append(method)
[docs] def handle_msg(self, msg, conn):
# type: (BaseSocket, Message, BaseConnection) -> Union[bool, None]
"""Decides how to handle various message types, allowing some to be
handled automatically
Args:
msg: A :py:class:`py2p.base.Message` object
conn: A :py:class:`py2p.base.BaseConnection` object
Returns:
True if an action was taken, None if not.
"""
for handler in self.__handlers:
self.__print__("Checking handler: %s" % handler.__name__, level=4)
if handler(msg, conn):
self.__print__(
"Breaking from handler: %s" % handler.__name__, level=4)
return True
return None
@property
def status(self):
# type: (BaseSocket) -> Union[str, List[str]]
"""The status of the socket.
Returns:
``"Nominal"`` if all is going well, or a list of unexpected
(Exception, traceback) tuples if not
"""
return self.daemon.exceptions or "Nominal"
@property
def outgoing(self):
# type: (BaseSocket) -> Iterable[bytes]
"""IDs of outgoing connections"""
return (handler.id for handler in self.routing_table.values()
if handler.outgoing)
@property
def incoming(self):
# type: (BaseSocket) -> Iterable[bytes]
"""IDs of incoming connections"""
return (handler.id for handler in self.routing_table.values()
if not handler.outgoing)
def __print__(self, *args, **kargs):
# type: (BaseSocket, *Any, **int) -> None
"""Private method to print if level is <= self.debug_level
Args:
*args: Each argument you wish to feed to the print method
**kargs: One keyword is used here: level, which defines the
lowest value of self.debug_level at which the message
will be printed
"""
if kargs.get('level', 0) <= self.debug_level:
with plock:
print(self.out_addr[1], *args)
def __del__(self):
# type: (BaseSocket) -> None
if not self.__closed:
self.close()
[docs]class Message(object):
"""An object which gets returned to a user, containing all necessary
information to parse and reply to a message
"""
__slots__ = ('msg', 'server')
[docs] def __init__(self, msg, server):
# type: (Message, InternalMessage, BaseSocket) -> None
"""Initializes a Message object
Args:
msg: A :py:class:`py2p.base.InternalMessage` object
server: A :py:class:`py2p.base.BaseSocket` object
"""
self.msg = msg
self.server = server
@property
def time(self):
# type: (Message) -> int
"""The time this Message was sent at"""
return self.msg.time
@property # type: ignore
@inherit_doc(InternalMessage.time_58)
def time_58(self):
# type: (Message) -> bytes
return self.msg.time_58
@property
def sender(self):
# type: (Message) -> bytes
"""The ID of this Message's sender"""
return self.msg.sender
@property # type: ignore
@inherit_doc(InternalMessage.id)
def id(self):
# type: (Message) -> bytes
return self.msg.id
@property # type: ignore
@inherit_doc(InternalMessage.payload)
def packets(self):
# type: (Message) -> Tuple[MsgPackable, ...]
return self.msg.payload
@inherit_doc(InternalMessage.__len__)
def __len__(self):
# type: (Message) -> int
return self.msg.__len__()
def __repr__(self):
# type: (Message) -> str
packets = self.packets
return "Message(type={}, packets={}, sender={})".format(
packets[0], packets[1:], self.sender)
[docs] def reply(self, *args):
# type: (Message, *MsgPackable) -> None
"""Replies to the sender if you're directly connected. Tries to make
a connection otherwise
Args:
*args: Each argument given is a packet you wish to send. This is
prefixed with base.flags.whisper, so the other end will
receive ``[base.flags.whisper, *args]``
"""
self.server._logger.debug(
'Initiating a direct reply to Message ID {}'.format(self.id))
if self.server.routing_table.get(self.sender):
self.server.routing_table.get(self.sender).send(
flags.whisper, flags.whisper, *args)
else:
self.server._logger.debug('Requesting connection for direct reply'
' to Message ID {}'.format(self.id))
request_hash = sha384(
self.sender + b58encode_int(getUTC()).decode()).hexdigest()
request_id = b58encode_int(int(request_hash, 16)).decode()
self.server.send(request_id, self.sender, type=flags.request)
to_send = (flags.whisper,
flags.whisper) # type: Tuple[MsgPackable, ...]
self.server.requests[request_id] = to_send + args
self.server._logger.critical(
"You aren't connected to the original sender. This reply is "
"not guarunteed, but we're trying to make a connection and "
"put the message through.")