
I have implemented all solutions in Python. In some cases I have used very powerful Networkx library to solve the problems.
This year I have participated in the fantastic project of Advent of Code. It works like a typical advent calendar where every day you have different task to do. However this calendar gives you a programming problems to implement. My solutions can be found on github repo.
Please find below the screenshot of this calendar:

I have decided to extend my pi radio and create a prototype of a button with which I could play defined url and stop it as well. I reused my code of mpd client and my button’s experiment to create a handle of single button. As long as the button is pressed the LED diode is on. When the button press is short (i.e. less than 1s) then radio starts playing hardcoded url. When the button press is longer than 1s then radio stops:
#!/usr/bin/env python
import mpd_client
import RPi.GPIO as gpio
import time
import commands
def main_loop():
gpio.setmode(gpio.BCM)
gpio.setup(14, gpio.IN)
gpio.setup(15, gpio.OUT)
try:
button_down = 0
while True:
button = gpio.input(14)
if not button:
gpio.output(15, gpio.HIGH)
if not button_down:
button_down = time.time()
else:
gpio.output(15, gpio.LOW)
now = time.time()
if button_down:
diff = now - button_down
if diff 1.0 and diff 3.0 and diff 5.0:
restart()
button_down = 0
time.sleep(100.0/1000.0)
except KeyboardInterrupt, e:
gpio.cleanup()
def play():
url = 'http://stream4.nadaje.com:8002/muzo'
ip = '127.0.0.1'
port = 6600
print 'Connecting to mpd server:',ip,':',port
print 'Playing: ', url
mpd_client.play(ip, port, url)
def stop():
ip = '127.0.0.1'
port = 6600
print 'Connecting to mpd server:',ip,':',port
print 'Stoping: '
mpd_client.stop(ip, port)
def network_restart():
cmd = 'service networking restart'
print 'network restart', cmd
(status, output) = commands.getstatusoutput(cmd)
if status:
sys.stderr.write(output)
sys.exit(status)
print output
def restart():
cmd = 'shutdown -r 0'
print 'restart', cmd
(status, output) = commands.getstatusoutput(cmd)
if status:
sys.stderr.write(output)
sys.exit(status)
print output
main_loop()
The code above contains also some “service” features. When the button will be pressed longer than 3s the network will be restarted, and when it will be pressed longer than 5s the full device will restart.
Moreover with a help of this great tutorial I installed my script as a systemd service which starts automatically with Raspberry pi.
The prototype looks like this:

For the real implementation I decided to modify a circuit. I removed all the elements and stay only with tact switch. I connected directly first pin of the button to the gpio pin 21 and second one to the gpio ground. There was also a need to modify slightly a setup code:
gpio.setup(21, gpio.IN, pull_up_down=gpio.PUD_UP)
the rest was not changed. With this line I turn on the raspberry’s pull up resistor so it can be safely eliminated from the circuit.
The script can be found on github.
The version with the single button which I placed on the top of the cover
:
I extended the circuit I made with LED and this time I added the tact switch. The LED is connected through 470Ohm resistor to pin 15. The button’s first pin is connected to the GND and second one to the pin 14 and through 10kOhm resistor to 3.3V output. When the button is pressed it closes the circuit so the current flows from 3.3V through the resistor to the ground and this is why the pin 14 returns false in input.
In my example when the button is pressed it turns on the LED for 1 second:
#!/usr/bin/env python
import RPi.GPIO as gpio
import time
gpio.setmode(gpio.BCM)
gpio.setup(14, gpio.IN)
gpio.setup(15, gpio.OUT)
while True:
button = gpio.input(14)
if not button:
gpio.output(15, gpio.HIGH)
time.sleep(1)
gpio.output(15, gpio.LOW)
while not button:
button = gpio.input(14)
The tact switch:

Recently I have decided to play around with GPIO of my Raspberry Pi. My goal is to find some useful application of LED diodes and tact switches and probably I will implement some additional features to pi radio.
But initially I just want to only turn on the LED diode to understand how it really works. So I created simple circuit with LED and resistor to protect this diode. The output pin gives me 3.3V and my diode allows only 2.2V and 20mA. So the resistor should have 55Ohms, however I used a bit greater resistance 470Ohm (it was the lowest I could find).
Here is the numbering of GPIO pins: 
I wrote the python script which uses pin 14 and 15 to turn on alternately
the diodes green and red every 1 second:
#!/usr/bin/env python import RPi.GPIO as gpio import time gpio.setmode(gpio.BCM) gpio.setup(14, gpio.OUT) gpio.setup(15, gpio.OUT) while True: gpio.output(14, gpio.HIGH) gpio.output(15, gpio.LOW) time.sleep(1) gpio.output(14, gpio.LOW) gpio.output(15, gpio.HIGH) time.sleep(1)
And here is the presentation:

I wrote a script which invokes ffmpeg and convert commands to create the gif file.
usage: output.gif input.mp4 [--fps fps]
#!/usr/bin/env python
import sys
import os
import commands
def make_gif(input_f, output_f, fps):
tmp = os.path.join('.', 'tmp');
if not os.path.exists(tmp):
os.mkdir(tmp)
to_jpg(tmp, input_f, fps)
to_gif(tmp, output_f, fps)
clean(tmp)
def to_jpg(tmp, input_f, fps):
cmd = 'ffmpeg -i ' + input_f + ' -r ' + str(fps) + ' ' + os.path.join(tmp, 'f%04d.jpg')
print 'dumping:', cmd
(status, output) = commands.getstatusoutput(cmd)
if status:
sys.stderr.write(output)
sys.exit(status)
print output
def to_gif(tmp, output_f, fps):
cmd = 'convert -delay ' + str(100/fps) + ' -loop 0 ' + os.path.join(tmp, '*.jpg') + ' ' + output_f
print 'creating gif:', cmd
(status, output) = commands.getstatusoutput(cmd)
if status:
sys.stderr.write(output)
sys.exit(status)
print output
def clean(tmp):
cmd = 'rm -rf '+ tmp
print 'cleaning:', cmd
(status, output) = commands.getstatusoutput(cmd)
if status:
sys.stderr.write(output)
sys.exit(status)
print output
def num(s):
try:
return int(s)
except ValueError:
return 0
def main():
args = sys.argv[1:]
if not args:
print "usage: output.gif input.mp4 [--fps fps]";
sys.exit(1)
if len(args) 1 and args[0] == '--fps':
fps = num(args[1])
if fps == 0:
fps = 5
del args[0:2]
print 'Converting',input_f,'to',output_f,'with',fps,'fps'
make_gif(input_f, output_f, fps)
if __name__ == "__main__":
main()
The script can be found on github.
I wrote a python script to connect to my pi radio and invoke stop or start command to play radio stream.
usage: {start|stop} [--url url] [--ip ip] [--port port]
#!/usr/bin/env python
import sys
from mpd import MPDClient
def connect(ip, port):
client = MPDClient()
client.timeout = 60 # network timeout in seconds (floats allowed), default: None
client.idletimeout = None
client.connect(ip, port)
return client
def stop(ip, port):
client = connect(ip, port)
client.clear()
return client
def play(ip, port, url):
client = stop(ip, port)
client.add(url)
client.play()
def num(s):
try:
return int(s)
except ValueError:
return 0
def main():
args = sys.argv[1:]
if not args:
print "usage: {start|stop} [--url url] [--ip ip] [--port port]";
sys.exit(1)
start = False
if args[0] == 'start':
start = True
elif args[0] == 'stop':
start = False
else:
print 'command {start|stop} not specified'
sys.exit(1)
del args[0]
url = 'http://stream4.nadaje.com:8002/muzo'
if args and args[0] == '--url':
url = args[1]
del args[0:2]
ip = '192.168.0.3'
if args and args[0] == '--ip':
ip = args[1]
del args[0:2]
port = 6600
if args and args[0] == '--port':
port = num(args[1])
del args[0:2]
print 'Connecting to mpd server:',ip,':',port
if start:
print 'Playing: ', url
play(ip, port, url)
else:
print 'Stopping'
stop(ip, port)
if __name__ == "__main__":
main()
The script can be found on github.
I wrote a Python script to check the channels of the surrounding wifi networks. Recently I was facing some issues with the quality of signal so I decided to check what’s going on with the help of the powerful linux tool. In scanning mode iwlist provides huge amount of information but in fact I just needed to know which networks occupied which channels. So I wrote an analyser which reads in standard input (e.g. using pipe) the output of iwlist.
usage: iwlist wlp3s0 scan |./analyser.py
The analyser lists the channels with frequency and below the names, link qualities and signal power of the networks using this channel:
name: quality: power: Channel: 1 (2.412 GHz) Name18272346 36/70 -74 dBm Name20053835 27/70 -83 dBm Name3742A1 26/70 -84 dBm Name4123456 25/70 -85 dBm Name57005901 23/70 -87 dBm Name6D2A47 22/70 -88 dBm Channel: 11 (2.462 GHz) Name119496 50/70 -60 dBm Name20682504 39/70 -71 dBm Channel: 13 (2.472 GHz) Name10682504 38/70 -72 dBm Channel: 52 (5.26 GHz) Name10682504 32/70 -78 dBm
#!/usr/bin/python
import sys
import re
def num(s):
try:
return int(s)
except ValueError:
return 0
def first(x):
match = re.search(r'Channel: (\d+)', x)
if match:
return num(match.group(1))
return x
def last(x):
power = x[2]
match = re.search(r'-(\d+) dBm', power)
if match:
return match.group(1)
return power
def print_cells(cells):
sorted_cells = sorted(cells, key=last)
for cell in sorted_cells:
print '\t\t','\t'.join(cell)
def process_channels(channels):
print_cells([(crop('name:', 20), crop('quality:', 10), crop('power:', 10))])
sorted_keys = sorted(channels, key=first)
for channel in sorted_keys:
print channel
print_cells(channels[channel])
def crop(name, chars):
if len(name) > chars:
return name[:chars]
else:
return name.ljust(chars)
def main():
channels = {}
channel = ''
name = ''
quality = ''
level = ''
for line in sys.stdin:
to_add = False
match = re.search(r'Frequency:(.+) GHz \(Channel (\d+)\)', line)
if match:
channel = 'Channel: '+ match.group(2) + ' ('+ match.group(1)+' GHz)'
name = ''
quality = ''
level = ''
match = re.search(r'Quality=(\S+)\s+Signal level=(.+) dBm', line)
if match:
quality = crop(match.group(1), 10)
level = crop(match.group(2) + ' dBm', 10)
match = re.search(r'ESSID:"(.+)"', line)
if match:
name = crop(match.group(1), 20)
to_add = True
if to_add:
if channel in channels:
channels[channel].append((name, quality, level))
else:
channels[channel] = [(name, quality, level)]
process_channels(channels)
if __name__ == "__main__":
main()
The script can be found on github.
I wrote a Python script to make a backup of the PostgreSQL database.
usage: [--dumpdir dir] [--clean days] database user
where:
dumpdir – destination directory for dump
clean – number of days back in the history to keep the previous dumps
database – database name
user – database user
#!/usr/bin/python
import sys
import os
import commands
from time import gmtime, strftime
def get_size(dumpname):
cmd = 'du -sh ' + dumpname
(status, output) = commands.getstatusoutput(cmd)
if status:
sys.stderr.write(output)
sys.exit(status)
return output
def dump_db(database, user, dumpname):
cmd = 'pg_dump ' + database + ' -Z6 -U ' + user + ' -f ' + dumpname # + ' --table main.placeholder'
print 'dumping:', cmd
(status, output) = commands.getstatusoutput(cmd)
if status:
sys.stderr.write(output)
sys.exit(status)
print 'dump created: ' + get_size(dumpname)
def clean_old_dumps(days, dumpdir, namebase):
cmd = "find " + dumpdir + " -maxdepth 1 -mtime +" + str(days) + " -name \"" + namebase + "*.gz\" -exec rm -rf '{}' ';'"
print 'removing:', cmd
(status, output) = commands.getstatusoutput(cmd)
if status:
sys.stderr.write(output)
sys.exit(status)
def get_time_name(time):
return strftime("%Y-%m-%d_%H_%M_%S", time)
def num(s):
try:
return int(s)
except ValueError:
return 0
def main():
args = sys.argv[1:]
if not args:
print "usage: [--dumpdir dir] [--clean days] database user";
sys.exit(1)
dumpdir = '/tmp'
if args[0] == '--dumpdir':
dumpdir = args[1]
del args[0:2]
clean = 0
if len(args) > 1 and args[0] == '--clean':
clean = num(args[1])
del args[0:2]
if len(args) < 2:
print "error: must specify database and user"
sys.exit(1)
database = args[0]
user = args[1]
namebase = 'dump_' + database + '_'
dumpname = os.path.join(dumpdir, namebase + get_time_name(gmtime()) + '.sql.gz')
dump_db(database, user, dumpname)
if clean:
print "Removing dumps made before (days):", clean
clean_old_dumps(clean, dumpdir, namebase)
if __name__ == "__main__":
main()
The script can be found on github.
I wrote a simple script to fast check open ports in my local network. This script skips the VPN connections.
Usage:
usage: [--max ip] port [port ...]
where:
ip is the maximum ip to scan
#!/usr/bin/python
import socket
from netifaces import interfaces, ifaddresses, AF_INET
import re
import sys
def get_my_ip():
result = []
print 'retrieving ip addresses ...'
for ifaceName in interfaces():
addresses = [i['addr'] for i in ifaddresses(ifaceName).setdefault(AF_INET, [{'addr':'No IP addr'}] )]
if not addresses:
continue;
address = addresses[0]
if address == 'No IP addr':
continue
if ifaceName == 'lo':
print 'Skip local: ', address
continue
match = re.search(r'tap', ifaceName)
if match:
print 'Skip vpn: ', address
continue
result.append(address)
print 'found:',', '.join(result)
return result
def check_ips(ip, maxip, ports):
match = re.search(r'(\d+.\d+.+\d+).\d+', ip)
if match:
ip_base = match.group(1)
ip_base = ip_base+'.'
ports_str = ', '.join(str(x) for x in ports)
print 'checking network: ',ip_base+'1-'+str(maxip)+':'+ports_str
for i in range(1,maxip):
ip_to_check = ip_base+str(i)
check_ip_ports(ip_to_check, ports)
def check_ip_ports(ip, ports):
for port in ports:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
result = sock.connect_ex((ip,port))
if result:
continue
print 'Ip:', ip, 'has open port', port
def num(s):
try:
return int(s)
except ValueError:
return 0
def main():
args = sys.argv[1:]
if not args:
print "usage: [--max ip] port [port ...]";
sys.exit(1)
maxip = 0
if args[0] == '--max':
maxip = num(args[1])
del args[0:2]
if not maxip:
maxip = 255
if len(args) == 0:
print "error: must specify one or more ports"
sys.exit(1)
ports = []
for arg in args:
port = num(arg)
if port:
ports.append(port)
if len(ports) == 0:
print "error: must specify one or more ports"
sys.exit(1)
ips = get_my_ip();
for ip in ips:
check_ips(ip, maxip, ports)
if __name__ == "__main__":
main()
The script can be found on github.