forked from LagrangeDev/lagrange-python
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmain.py
More file actions
110 lines (87 loc) · 3.21 KB
/
main.py
File metadata and controls
110 lines (87 loc) · 3.21 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
import os
import logging
import asyncio
from lagrange.utils.sign import sign_provider
from lagrange.client.client import Client
from lagrange.info.app import app_list
from lagrange.info.device import DeviceInfo
from lagrange.info.sig import SigInfo
from lagrange.client.server_push.events.group import GroupMessage
from lagrange.client.message.elems import Text
DEVICE_INFO_PATH = "./device.json"
SIGINFO_PATH = "./sig.bin"
class InfoManager:
def __init__(self, uin: int, device_info_path: str, sig_info_path: str):
self.uin: int = uin
self._device_info_path: str = device_info_path
self._sig_info_path: str = sig_info_path
self._device = None
self._sig_info = None
@property
def device(self) -> DeviceInfo:
assert self._device, "Device not initialized"
return self._device
@property
def sig_info(self) -> SigInfo:
assert self._sig_info, "SigInfo not initialized"
return self._sig_info
def save_all(self):
with open(self._sig_info_path, "wb") as f:
f.write(self._sig_info.dump())
with open(self._device_info_path, "wb") as f:
f.write(self._device.dump())
print("device info saved")
def __enter__(self):
if os.path.isfile(self._device_info_path):
with open(self._device_info_path, "rb") as f:
self._device = DeviceInfo.load(f.read())
else:
print(f"{self._device_info_path} not found, generating...")
self._device = DeviceInfo.generate(self.uin)
if os.path.isfile(self._sig_info_path):
with open(self._sig_info_path, "rb") as f:
self._sig_info = SigInfo.load(f.read())
else:
print(f"{self._sig_info_path} not found, generating...")
self._sig_info = SigInfo.new(8848)
return self
def __exit__(self, *_):
pass
logging.basicConfig(
level=logging.DEBUG,
format="%(asctime)s %(name)s[%(levelname)s]: %(message)s"
)
async def heartbeat_task(client: Client):
while True:
await client.online.wait()
await asyncio.sleep(120)
print(f"{round(await client.sso_heartbeat(True) * 1000, 2)}ms to server")
async def msg_handler(client: Client, event: GroupMessage):
print(event)
if event.msg.startswith("114514"):
await client.send_grp_msg([Text("1919810")], event.grp_id)
print(f"{event.nickname}({event.grp_name}): {event.msg}")
async def main():
uin = int(os.environ.get("LAGRANGE_UIN", "0"))
sign_url = os.environ.get("LAGRANGE_SIGN_URL", "")
app = app_list["linux"]
with InfoManager(uin, DEVICE_INFO_PATH, SIGINFO_PATH) as im:
client = Client(
uin,
app,
im.device,
im.sig_info,
sign_provider(sign_url) if sign_url else None
)
client.events.subscribe(GroupMessage, msg_handler)
client.connect()
asyncio.create_task(heartbeat_task(client))
if im.sig_info.d2:
if not await client.register():
await client.login()
else:
await client.login()
im.save_all()
await client.wait_closed()
if __name__ == '__main__':
asyncio.run(main())