-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathinit.sql
More file actions
270 lines (224 loc) · 6.44 KB
/
init.sql
File metadata and controls
270 lines (224 loc) · 6.44 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
SET table.sql-dialect = default;
CREATE TABLE IF NOT EXISTS `user_table` (
`id` BIGINT,
`name` STRING,
`country` STRING,
`age` INT,
`os` STRING,
`network` STRING,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'redis',
'url' = 'redis://${NODE_IP}:${REDIS_PORT}/0'
);
CREATE TABLE IF NOT EXISTS `item_table` (
`id` BIGINT,
`name` STRING,
`price` FLOAT,
`brand` STRING,
`category1` STRING,
`category2` STRING,
`category3` STRING,
`category4` STRING,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'redis',
'url' = 'redis://${NODE_IP}:${REDIS_PORT}/0'
);
CREATE TABLE IF NOT EXISTS `global_hot_item` (
`invert_key` STRING,
`id` BIGINT,
`score` FLOAT,
PRIMARY KEY (invert_key) NOT ENFORCED
) WITH (
'connector' = 'redis',
'data-structure' = 'list',
'url' = 'redis://${NODE_IP}:${REDIS_PORT}/0'
);
CREATE TABLE IF NOT EXISTS `user_interest_category1` (
`user_id` BIGINT,
`category1` STRING,
`score` FLOAT,
PRIMARY KEY (user_id) NOT ENFORCED
) WITH (
'connector' = 'redis',
'data-structure' = 'list',
'url' = 'redis://${NODE_IP}:${REDIS_PORT}/0'
);
CREATE TABLE IF NOT EXISTS `category1_hot_item` (
`category1` STRING,
`item_id` BIGINT,
`score` FLOAT,
PRIMARY KEY (category1) NOT ENFORCED
) WITH (
'connector' = 'redis',
'data-structure' = 'list',
'url' = 'redis://${NODE_IP}:${REDIS_PORT}/0'
);
CREATE TABLE IF NOT EXISTS `user_recent_click_item` (
`user_id` BIGINT,
`item_id` BIGINT,
`bhv_time` BIGINT,
PRIMARY KEY (user_id) NOT ENFORCED
) WITH (
'connector' = 'redis',
'data-structure' = 'list',
'url' = 'redis://${NODE_IP}:${REDIS_PORT}/0'
);
CREATE TABLE IF NOT EXISTS `user_exposure_item` (
`user_id` BIGINT,
`item_id` BIGINT,
`bhv_time` BIGINT,
PRIMARY KEY (user_id) NOT ENFORCED
) WITH (
'connector' = 'redis',
'data-structure' = 'list',
'url' = 'redis://${NODE_IP}:${REDIS_PORT}/0',
'cache-ttl' = '0'
);
CREATE TABLE IF NOT EXISTS `itemcf_i2i` (
`item_id1` BIGINT,
`item_id2` BIGINT,
`score` FLOAT,
PRIMARY KEY (item_id1) NOT ENFORCED
) WITH (
'connector' = 'redis',
'data-structure' = 'list',
'url' = 'redis://${NODE_IP}:${REDIS_PORT}/0'
);
CREATE TABLE IF NOT EXISTS `item_embedding` (
`id` BIGINT,
`embedding` ARRAY<FLOAT>,
`name` STRING,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'milvus',
'url' = 'http://${NODE_IP}:${MILVUS_PORT}',
'token' = 'root:Milvus',
'database' = 'default',
'collection' = 'item_embedding'
);
CREATE TABLE IF NOT EXISTS `rec_log_kafka` (
`user_id` BIGINT,
`item_id` BIGINT,
`item_name` STRING,
`rec_reason` STRING,
`req_time` BIGINT,
`req_id` STRING
) WITH (
'connector' = 'kafka',
'topic' = 'rec_log',
'properties.bootstrap.servers' = '${NODE_IP}:${KAFKA_PORT}',
'format' = 'json'
);
-- save final_recall_item to kafka
create or replace sql function save_rec_item;
define input table final_recall_item(
`user_id` BIGINT,
`item_id` BIGINT,
`item_name` STRING,
`rec_reason` STRING,
`req_time` BIGINT,
`req_id` STRING
);
insert into rec_log_kafka
select * from final_recall_item;
insert into user_exposure_item
select user_id, item_id, req_time from final_recall_item;
return;
-- main_rec function
create or replace sql function main_rec;
define input table user_info(id bigint);
cache table recall_item_schema as select
cast(0 as BIGINT) as item_id,
cast('' as varchar) as rec_reason;
cache table recall_item as call get('recall_fun')(user_info) like recall_item_schema;
cache table rec_item as
select item_id, category1, name, rec_reason
from
recall_item join item_table on id = item_id;
cache table diversify_rec_item as call window_diversify(rec_item, 'category1', '3', '1', '10');
cache table request_meta as select
user_info.id as user_id,
cast(CURRENT_TIMESTAMP as BIGINT) as req_time,
uuid() as req_id
from user_info;
cache table final_rec_item as
select
request_meta.user_id as user_id,
item_id,
diversify_rec_item.name as item_name,
rec_reason,
request_meta.req_time as req_time,
request_meta.req_id as req_id
from
request_meta join diversify_rec_item on 1=1;
call save_rec_item(final_rec_item) async;
return final_rec_item;
-- recall function
create or replace sql function recall_fun;
define input table user_info(id bigint);
cache table user_embedding as
select ARRAY[0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8] as embedding;
cache table vector_recall as
select item_embedding.id as item_id
from
user_embedding join item_embedding on 1=1
order by ip(user_embedding.embedding, item_embedding.embedding)
limit 300;
cache table vector_recall as
select item_id, 'vector_recall' as rec_reason
from vector_recall;
cache table exposured_item as
select item_id
from
user_info join user_exposure_item on user_id = user_info.id;
cache table cur_recent_click_item as
select item_id
from
user_info join user_recent_click_item on user_id = user_info.id
limit 10;
cache table i2i_recall as
select item_id2 as item_id, 'itemcf_recall' as rec_reason
from
cur_recent_click_item join itemcf_i2i on item_id1 = cur_recent_click_item.item_id
limit 300;
cache table global_hot_recall as
select id as item_id, 'global_hot_recall' as rec_reason
from
global_hot_item where invert_key = 'global'
limit 300;
cache table cur_user_interest_category1 as
select category1
from
user_info join user_interest_category1 on user_id = user_info.id
limit 10;
cache table category1_recall as
select item_id as item_id, 'user_category1_interest_recall' as rec_reason
from
cur_user_interest_category1 join category1_hot_item
on category1_hot_item.category1 = cur_user_interest_category1.category1
limit 300;
cache table dedup_i2i_recall as call dedup(i2i_recall, exposured_item, 'item_id', 'item_id');
cache table dedup_global_hot_recall as call dedup(global_hot_recall, exposured_item, 'item_id', 'item_id');
cache table dedup_category1_recall as call dedup(category1_recall, exposured_item, 'item_id', 'item_id');
cache table dedup_vector_recall as call dedup(vector_recall, exposured_item, 'item_id', 'item_id');
cache table all_recall_item as
select * from dedup_i2i_recall
union all
select * from dedup_global_hot_recall
union all
select * from dedup_category1_recall
union all
select * from dedup_vector_recall;
cache table truncate_recall_item as
select item_id, rec_reason
from all_recall_item
limit 300;
cache table final_recall_item as
select item_id, LISTAGG(distinct rec_reason) as rec_reason
from truncate_recall_item
group by item_id;
return final_recall_item;
-- main_rec api
create or replace api main_rec with main_rec;