-
Notifications
You must be signed in to change notification settings - Fork 10
Expand file tree
/
Copy pathapplication.py
More file actions
176 lines (143 loc) · 5.95 KB
/
application.py
File metadata and controls
176 lines (143 loc) · 5.95 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
"""Buttplug Python - Complete Application Example
This is a complete, working example that demonstrates the full workflow
of a Buttplug application. If you're new to Buttplug, start here!
Prerequisites:
1. Install Intiface Central: https://intiface.com/central
2. Start the server in Intiface Central (click "Start Server")
3. Run: python application.py
"""
import asyncio
from buttplug import ButtplugClient, DeviceOutputCommand, InputType, OutputType
from buttplug.errors import ButtplugDeviceError, ButtplugError
def print_device_capabilities(device) -> None:
"""Print the capabilities of a device."""
print(f" {device.name}")
# Check output capabilities (things we can make the device do)
outputs = []
if device.has_output(OutputType.VIBRATE):
outputs.append("Vibrate")
if device.has_output(OutputType.ROTATE):
outputs.append("Rotate")
if device.has_output(OutputType.OSCILLATE):
outputs.append("Oscillate")
if device.has_output(OutputType.POSITION) or device.has_output(
OutputType.POSITION_WITH_DURATION
):
outputs.append("Position")
if device.has_output(OutputType.CONSTRICT):
outputs.append("Constrict")
if outputs:
print(f" Outputs: {', '.join(outputs)}")
# Check input capabilities (sensors we can read)
inputs = []
if device.has_input(InputType.BATTERY):
inputs.append("Battery")
if device.has_input(InputType.RSSI):
inputs.append("RSSI")
if inputs:
print(f" Inputs: {', '.join(inputs)}")
print()
async def main() -> None:
print("===========================================")
print(" Buttplug Python Application Example")
print("===========================================\n")
# Step 1: Create a client
# The client name identifies your application to the server.
client = ButtplugClient("My Buttplug Application")
# Step 2: Set up event handlers
# Always do this BEFORE connecting to avoid missing events.
client.on_device_added = lambda d: print(f"[+] Device connected: {d.name}")
client.on_device_removed = lambda d: print(f"[-] Device disconnected: {d.name}")
client.on_disconnect = lambda: print("[!] Server connection lost!")
# Step 3: Connect to the server
print("Connecting to Intiface Central...")
try:
await client.connect("ws://127.0.0.1:12345")
except ButtplugError as e:
print("ERROR: Could not connect to Intiface Central!")
print("Make sure Intiface Central is running and the server is started.")
print("Default address: ws://127.0.0.1:12345")
print(f"Error: {e}")
return
print("Connected!\n")
# Step 4: Scan for devices
print("Scanning for devices...")
print("Turn on your Bluetooth/USB devices now.\n")
await client.start_scanning()
# Wait for devices (in a real app, you might use a UI or timeout)
input("Press Enter when your devices are connected...")
await client.stop_scanning()
# Step 5: Check what devices we found
devices = list(client.devices.values())
if not devices:
print("No devices found. Make sure your device is:")
print(" - Turned on")
print(" - In pairing/discoverable mode")
print(" - Supported by Buttplug (check https://iostindex.com)")
await client.disconnect()
return
print(f"\nFound {len(devices)} device(s):\n")
# Step 6: Display device capabilities
for device in devices:
print_device_capabilities(device)
# Step 7: Interactive device control
print("=== Interactive Control ===")
print("Commands:")
print(" v <0-100> - Vibrate all devices at percentage")
print(" s - Stop all devices")
print(" b - Read battery levels")
print(" q - Quit\n")
while True:
try:
user_input = input("> ").strip().lower()
except EOFError:
break
if not user_input:
continue
try:
if user_input.startswith("v "):
# Vibrate command
try:
percent = int(user_input[2:])
if 0 <= percent <= 100:
intensity = percent / 100.0
for device in devices:
if device.has_output(OutputType.VIBRATE):
await device.run_output(
DeviceOutputCommand(OutputType.VIBRATE, intensity)
)
print(f" {device.name}: vibrating at {percent}%")
else:
print(" Usage: v <0-100>")
except ValueError:
print(" Usage: v <0-100>")
elif user_input == "s":
# Stop all devices
await client.stop_all_devices()
print(" All devices stopped.")
elif user_input == "b":
# Read battery levels
for device in devices:
if device.has_input(InputType.BATTERY):
try:
battery = await device.battery()
print(f" {device.name}: {battery * 100:.0f}% battery")
except ButtplugDeviceError as e:
print(f" {device.name}: could not read battery - {e}")
else:
print(f" {device.name}: no battery sensor")
elif user_input == "q":
break
else:
print(" Unknown command. Use v, s, b, or q.")
except ButtplugDeviceError as e:
print(f" Device error: {e}")
except ButtplugError as e:
print(f" Error: {e}")
# Step 8: Clean up
print("\nStopping devices and disconnecting...")
await client.stop_all_devices()
await client.disconnect()
print("Goodbye!")
if __name__ == "__main__":
asyncio.run(main())