Sync Module¶
-
class
py2p.sync.SyncSocket(*args, **kwargs)[source]¶ This class is used to sync dictionaries between programs. It extends
py2p.mesh.MeshSocket
Because of this inheritance, this can also be used as an alert network
This also implements and optional leasing system by default. This leasing system means that if node A sets a key, node B cannot overwrite the value at that key for an hour.
This may be turned off by adding
leasing=Falseto the constructor.Added Events:
-
Event ‘update’(conn, key, data, new_meta) This event is triggered when a key is updated in your synchronized dictionary.
new_metawill be an object containing metadata of this change, including the time of change, and who initiated the change.Parameters: - conn (py2p.sync.SyncSocket) – A reference to this abstract socket
- key (bytes) – The key which has a new value
- new_data – The new value at that key
- new_meta (py2p.sync.metatuple) – Metadata on the key changer
-
Event ‘delete’(conn, key) This event is triggered when a key is deleted from your synchronized dictionary.
Parameters: - conn (py2p.sync.SyncSocket) – A reference to this abstract socket
- key (bytes) – The key which has a new value
-
__init__(*args, **kwargs)¶ Initialize a chord socket
Parameters: - addr – The address you wish to bind to (ie: “192.168.1.1”)
- port – The port you wish to bind to (ie: 44565)
- prot – The Protocol you wish to operate over, defined by
a
py2p.base.Protocolobject - out_addr – Your outward facing address. Only needed if you’re connecting over the internet. If you use ‘0.0.0.0’ for the addr argument, this will automatically be set to your LAN address.
- debug_level – The verbosity you want this socket to use when printing event data
Raises: SocketException– The address you wanted could not be bound, or is otherwise used
-
data¶
-
metadata¶
-
_send_peers(handler)[source]¶ Shortcut method to send a handshake response. This method is extracted from
__handle_handshake()in order to allow cleaner inheritence frompy2p.sync.SyncSocket
-
__setitem__(key, data)[source]¶ Updates the value at a given key.
Parameters: Raises:
-
set(key, data)[source]¶ Updates the value at a given key.
Parameters: Raises:
-
update(update_dict)[source]¶ Equivalent to
dict.update()This calls
SyncSocket.__setitem__()for each key/value pair in the given dictionary.Parameters: update_dict – A dict-like object to extract key/value pairs from. Key and value be astrorbytes-like objectRaises: KeyError– If you do not have the lease for this slot. Lease is given automatically for one hour if the slot is open.
-
__getitem__(key)[source]¶ Looks up the value at a given key.
Parameters: key – The key that you wish to check. Must be a strorbytes-like objectReturns: The value at said key Raises: KeyError– If there is no value assigned at that key
-
get(key, ifError=None)[source]¶ Retrieves the value at a given key.
Parameters: Returns: The value at said key, or the value at ifError if there’s an
Exception
-
apply_delta(key, delta)[source]¶ Updates a stored mapping with the given delta. This allows for more graceful handling of conflicting changes
Parameters: Raises: TypeError– If the updated key does not store a mapping already
-
pop(key, *args)[source]¶ Returns a value, with the side effect of deleting that association
Parameters: Returns: The value of the supplied key, or
ifErrorRaises: KeyError– If the key does not have an associated value
-
popitem()[source]¶ Returns an association, with the side effect of deleting that association
Returns: An arbitrary association
-
_BaseSocket__closed¶
-
_BaseSocket__handlers¶
-
_MeshSocket__clean_waterfalls()¶ This function cleans the
setof recently relayed messages based on the following heuristics:- Delete all older than 60 seconds
-
_MeshSocket__handle_handshake(msg, handler)¶ This callback is used to deal with handshake signals. Its three primary jobs are:
- reject connections seeking a different network
- set connection state
- deal with connection conflicts
Parameters: - msg – A
Message - handler – A
MeshConnection
Returns: Either
TrueorNone
-
_MeshSocket__handle_request(msg, handler)¶ This callback is used to deal with request signals. Its three primary jobs are:
- respond with a peers signal if packets[1] is
'*' - if you know the ID requested, respond to it
- if you don’t, make a request with your peers
Parameters: - msg – A
Message - handler – A
MeshConnection
Returns: Either
TrueorNone- respond with a peers signal if packets[1] is
-
_MeshSocket__handle_response(msg, handler)¶ This callback is used to deal with response signals. Its two primary jobs are:
- if it was your request, send the deferred message
- if it was someone else’s request, relay the information
Parameters: - msg – A
Message - handler – A
MeshConnection
Returns: Either
TrueorNone
-
_MeshSocket__resolve_connection_conflict(handler, h_id)¶ Sometimes in trying to recover a network a race condition is created. This function applies a heuristic to try and organize the fallout from that race condition. While it isn’t perfect, it seems to have increased connection recovery rate from ~20% to ~75%. This statistic is from memory on past tests. Much improvement can be made here, but this statistic can likely never be brought to 100%.
In the failure condition, the overall network is unaffected for large networks. In small networks this failure condition causes a fork, usually where an individual node is kicked out.
Parameters: - handler – The handler with whom you have a connection conflict
- h_id – The id of this handler
-
_SyncSocket__check_lease(key, new_data, new_meta, delta=False)¶
-
_SyncSocket__delta(key, delta, new_meta, error=True)¶ Updates a stored mapping with the given delta. This allows for more graceful handling of conflicting changes
Parameters:
-
_SyncSocket__handle_delta(msg, handler)¶ This callback is used to deal with delta storage signals. Its primary job is:
- update the mapping in a given key
- Args:
- msg: A
Messagehandler: AMeshConnection - Returns:
- Either
TrueorNone
-
_SyncSocket__handle_store(msg, handler)¶ This callback is used to deal with data storage signals. Its two primary jobs are:
- store data in a given key
- delete data in a given key
- Args:
- msg: A
Messagehandler: AMeshConnection - Returns:
- Either
TrueorNone
-
_SyncSocket__leasing¶
-
_SyncSocket__store(key, new_data, new_meta, error=True)¶ Private API method for storing data. You have permission to store something if:
- The network is not enforcing leases, or
- There is no value at that key, or
- The lease on that key has lapsed (not been set in the last hour), or
- You are the owner of that key
Parameters: - key – The key you wish to store data at
- new_data – The data you wish to store in said key
- new_meta – The metadata associated with this storage
- error – A boolean which says whether to raise a
KeyErrorif you can’t store there
Raises: KeyError– If someone else has a lease at this value, anderrorisTrue
-
_get_peer_list()¶ This function is used to generate a list-formatted group of your peers. It goes in format
[ ((addr, port), ID), ...]
-
_handle_peers(msg, handler)¶ This callback is used to deal with peer signals. Its primary jobs is to connect to the given peers, if this does not exceed
py2p.mesh.max_outgoingParameters: - msg – A
Message - handler – A
MeshConnection
Returns: Either
TrueorNone- msg – A
-
_logger¶
-
_send_handshake(handler)¶ Shortcut method for sending a handshake to a given handler
Parameters: handler – A MeshConnection
-
awaiting_ids¶
-
close()¶ If the socket is not closed, close the socket
Raises: RuntimeError– The socket was already closed
-
connect(addr, port, id=None, conn_type=<class ‘py2p.mesh.MeshConnection’>)¶ This function connects you to a specific node in the overall network. Connecting to one node should connect you to the rest of the network, however if you connect to the wrong subnet, the handshake failure involved is silent. You can check this by looking at the truthiness of this objects routing table. Example:
>>> conn = mesh.MeshSocket('localhost', 4444) >>> conn.connect('localhost', 5555) >>> # do some other setup for your program >>> if not conn.routing_table: ... conn.connect('localhost', 6666) # any fallback address
Parameters: - addr – A string address
- port – A positive, integral port
- id – A string-like object which represents the expected ID of this node
-
daemon¶
-
debug_level¶
-
disconnect(handler)¶ Closes a given connection, and removes it from your routing tables
Parameters: handler – the connection you would like to close
-
emit(event, *args, **kwargs)¶ Emit
event, passing*argsand**kwargsto each attached function. ReturnsTrueif any functions are attached toevent; otherwise returnsFalse.Example:
ee.emit('data', '00101001')
Assuming
datais an attached function, this will calldata('00101001')'.For coroutine event handlers, calling emit is non-blocking. In other words, you do not have to await any results from emit, and the coroutine is scheduled in a fire-and-forget fashion.
-
handle_msg(msg, conn)¶ Decides how to handle various message types, allowing some to be handled automatically
Parameters: - msg – A
py2p.base.Messageobject - conn – A
py2p.base.BaseConnectionobject
Returns: True if an action was taken, None if not.
- msg – A
-
id¶
-
incoming¶ IDs of incoming connections
-
listeners(event)¶ Returns the list of all listeners registered to the
event.
-
on(event, f=None)¶ Registers the function (or optionally an asyncio coroutine function)
fto the event nameevent.If
fisn’t provided, this method returns a function that takesfas a callback; in other words, you can use this method as a decorator, like so:@ee.on('data') def data_handler(data): print(data)
As mentioned, this method can also take an asyncio coroutine function:
@ee.on('data') async def data_handler(data) await do_async_thing(data)
This will automatically schedule the coroutine using the configured scheduling function (defaults to
asyncio.ensure_future) and the configured event loop (defaults toasyncio.get_event_loop()).
-
once(event, f=None)¶ The same as
ee.on, except that the listener is automatically removed after being called.
-
out_addr¶
-
outgoing¶ IDs of outgoing connections
-
protocol¶
-
queue¶
-
recv(quantity=1)¶ This function has two behaviors depending on whether quantity is left as default.
If quantity is given, it will return a list of
Messageobjects up to length quantity.If quantity is left alone, it will return either a single
Messageobject, orNoneParameters: quantity – The maximum number of Messages you would like to pull (default: 1)Returns: A list of Messages, an empty list, a singleMessage, orNone
-
register_handler(method)¶ Register a handler for incoming method.
Parameters: method – A function with two given arguments. Its signature should be of the form handler(msg, handler), where msg is apy2p.base.Messageobject, and handler is apy2p.base.BaseConnectionobject. It should returnTrueif it performed an action, to reduce the number of handlers checked.Raises: ValueError– If the method signature doesn’t parse correctly
-
remove_all_listeners(event=None)¶ Remove all listeners attached to
event. IfeventisNone, remove all listeners on all events.
-
remove_listener(event, f)¶ Removes the function
ffromevent.
-
request_peers()¶ Requests your peers’ routing tables
-
requests¶
-
routing_table¶
-
send(*args, **kargs)¶ This sends a message to all of your peers. If you use default values it will send it to everyone on the network
Parameters: - *args – A list of objects you want your peers to receive
- **kargs – There are two keywords available:
- flag – A string or bytes-like object which defines your flag. In other words, this defines packet 0.
- type – A string or bytes-like object which defines your message type. Changing this from default can have adverse effects.
Raises: TypeError– If any of the arguments are not serializable. This means your objects must be one of the following:Warning
If you change the type attribute from default values, bad things could happen. It MUST be a value from
py2p.base.flags, and more specifically, it MUST be eitherbroadcastorwhisper. The only other valid flags arewaterfallandrenegotiate, but these are RESERVED and must NOT be used.
-
status¶ The status of the socket.
Returns: "Nominal"if all is going well, or a list of unexpected (Exception, traceback) tuples if not
-
waterfall(msg)¶ This function handles message relays. Its return value is based on whether it took an action or not.
Parameters: msg – The Messagein questionReturns: Trueif the message was then forwarded.Falseif not.
-
waterfalls¶
-