-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathdac.py
More file actions
155 lines (120 loc) · 4.95 KB
/
dac.py
File metadata and controls
155 lines (120 loc) · 4.95 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
import wandb
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from tqdm import tqdm
from torch.utils.tensorboard import SummaryWriter
from collections import deque
from algorithm import Algorithm
class DAC(Algorithm):
def __init__(self, cfg, obs_dim):
super(DAC, self).__init__(cfg, obs_dim)
assert isinstance(
obs_dim, int
), "DAC only supports 1D observation space for now"
self.reward_style = cfg.alg.reward_style
self.linear = cfg.use_linear
if self.linear:
print("No activation function in MLP for DAC")
modules = [
nn.Linear(obs_dim, cfg.alg.hidden_dim),
nn.LeakyReLU() if not self.linear else nn.Identity(),
]
for _ in range(cfg.alg.hidden_depth - 1):
modules.append(nn.Linear(cfg.alg.hidden_dim, cfg.alg.hidden_dim))
modules.append(nn.LeakyReLU() if not self.linear else nn.Identity())
modules.append(nn.Linear(cfg.alg.hidden_dim, 1))
self.model = nn.Sequential(*modules).to(self.device)
# self.model = nn.Sequential(
# nn.Linear(obs_dim, cfg.alg.hidden_dim),
# nn.ReLU(),
# nn.Linear(cfg.alg.hidden_dim, 1),
# ).to(self.device)
self.optimizer = optim.AdamW(self.model.parameters(), lr=cfg.alg.lr)
self.normalize_reward = cfg.alg.normalize_reward
self.target_std = cfg.alg.target_std
def compute_gan_loss(self, policy_output, expert_output):
assert len(policy_output.shape) == 2 and len(expert_output.shape) == 2
assert policy_output.shape[1] == 1 and expert_output.shape[1] == 1
zeros = torch.zeros(expert_output.shape[0], 1).to(self.device)
ones = torch.ones(policy_output.shape[0], 1).to(self.device)
loss_policy = F.binary_cross_entropy_with_logits(
policy_output, zeros, reduction="mean"
)
loss_expert = F.binary_cross_entropy_with_logits(
expert_output, ones, reduction="mean"
)
loss = (loss_policy + loss_expert) / 2
return loss
def update(
self,
dataloader,
val_dataloader,
num_epochs,
verbose: bool,
log: bool,
global_epoch: int,
):
for epoch in range(num_epochs):
if verbose:
progress_bar = tqdm(total=len(dataloader))
progress_bar.set_description(f"Epoch {epoch}")
metrics = dict()
metrics.update({"dac/global_epoch": global_epoch})
for _, batch in enumerate(dataloader):
batch = batch.float().to(self.device)
policy_obs = batch[:, : batch.shape[1] // 2]
expert_obs = batch[:, batch.shape[1] // 2 :]
# GAN loss
policy_output = self.model(policy_obs)
expert_output = self.model(expert_obs)
gan_loss = self.compute_gan_loss(policy_output, expert_output)
# grad penalty
alpha = torch.rand(policy_obs.shape[0], 1).to(self.device)
interpolated = alpha * policy_obs + (1 - alpha) * expert_obs
interpolated.requires_grad = True
output = self.model(interpolated)
grad = torch.autograd.grad(
outputs=output,
inputs=interpolated,
grad_outputs=torch.ones_like(output),
create_graph=True,
retain_graph=True,
)[0]
assert len(grad.shape) == 2
grad_penalty = ((grad.norm(2, dim=1) - 1) ** 2).mean()
# compute and optimize loss
loss = gan_loss + 10 * grad_penalty
self.optimizer.zero_grad()
loss.backward()
self.optimizer.step()
logs = {
"dac/loss": loss.item(),
"dac/gan_loss": gan_loss.item(),
"dac/grad_penalty": grad_penalty.item(),
}
if verbose:
progress_bar.update(1)
progress_bar.set_postfix(**logs)
if verbose:
progress_bar.close()
metrics.update(logs) # only log last batch
if log:
wandb.log(metrics)
@torch.no_grad()
def reward_fn(self, obs):
assert (
len(obs.shape) == 2 or len(obs.shape) == 4
), obs.shape # [bs, obs_dim] or [bs, c, h, w]
if len(obs.shape) == 4:
assert False, "does not support image input"
if self.reward_style == "airl":
r = self.model(obs)
elif self.reward_style == "gail":
r = -torch.log(1.0 - torch.sigmoid(self.model(obs)) + 1e-8)
else:
raise NotImplementedError
if self.normalize_reward:
r = (r - r.mean()) / (r.std() + 1e-6) * self.target_std
return r