- Python 99.1%
- Makefile 0.9%
|
|
||
|---|---|---|
| docs | ||
| examples | ||
| src/ethernetip | ||
| tests | ||
| .gitignore | ||
| CHANGELOG.md | ||
| LICENSE | ||
| Makefile | ||
| noxfile.py | ||
| pyproject.toml | ||
| README.md | ||
| setup.cfg | ||
| setup.py | ||
python-ethernetip
A Python implementation of the EtherNet/IP protocol for industrial automation and control systems.
🎯 Overview
This library provides a clean, Pythonic interface to communicate with industrial devices using the EtherNet/IP (Ethernet Industrial Protocol) standard. It supports both explicit messaging (request/response) and implicit messaging (I/O connections) commonly used in industrial automation.
What is EtherNet/IP?
EtherNet/IP is an industrial network protocol that adapts the Common Industrial Protocol (CIP) to standard Ethernet. It's widely used in factory automation, process control, and building automation systems.
✨ Features
- 🔌 Explicit Messaging - Request/response communication for configuration and data access
- ⚡ Implicit I/O - High-speed cyclic data exchange for real-time control
- 🔍 Device Discovery - Network scanning and device identification
- 🧵 Thread-Safe - Built-in locking mechanisms for concurrent operations
- ⚙️ Configurable - Centralized configuration for timeouts, buffer sizes, and I/O parameters
- 📝 Well-Documented - Clear API with comprehensive docstrings
- 🐍 Python 2/3 Compatible - Works with Python 2.7 and Python 3.7+
📦 Installation
Release Installation
pip install ethernetip
From Source
git clone https://codeberg.org/paperwork/python-ethernetip.git
cd python-ethernetip
pip install -e .
Dependencies
dpkt- Packet creation and parsing
pip install dpkt
🚀 Quick Start
Device Discovery
Scan your network for EtherNet/IP devices:
import ethernetip
# Create EtherNet/IP instance
enip = ethernetip.EtherNetIP()
# Create explicit connection
conn = enip.explicit_conn("192.168.1.100")
# Scan network
devices = conn.scanNetwork("192.168.1.255", timeout=5)
for device in devices:
print(f"Found: {device.product_name.decode()}")
Read Device Identity
import ethernetip
# Connect to device
enip = ethernetip.EtherNetIP("192.168.1.100")
conn = enip.explicit_conn()
# Register session
conn.registerSession()
# Read device attributes (Identity Object, Instance 1)
for attr in range(1, 8):
status, data = conn.getAttrSingle(
ethernetip.CIP_OBJ_IDENTITY,
instance=1,
attribute=attr
)
if status == 0:
print(f"Attribute {attr}: {data}")
I/O Connection (Real-time Data Exchange)
import ethernetip
import time
# Setup connection
enip = ethernetip.EtherNetIP("192.168.1.100")
conn = enip.explicit_conn()
conn.registerSession()
# Register I/O assemblies (1 byte input, 1 byte output)
input_data = enip.registerAssembly(
ethernetip.EtherNetIP.ENIP_IO_TYPE_INPUT,
size=1,
instance=101,
conn=conn
)
output_data = enip.registerAssembly(
ethernetip.EtherNetIP.ENIP_IO_TYPE_OUTPUT,
size=1,
instance=100,
conn=conn
)
# Start I/O communication
enip.startIO()
conn.sendFwdOpenReq(
inputinst=101,
outputinst=100,
configinst=1
)
conn.produce()
# Control outputs
try:
while True:
# Set output bit 0
output_data[0] = True
time.sleep(0.5)
# Read input bit 0
print(f"Input bit 0: {input_data[0]}")
output_data[0] = False
time.sleep(0.5)
except KeyboardInterrupt:
pass
# Cleanup
conn.stopProduce()
conn.sendFwdCloseReq(101, 100, 1)
enip.stopIO()
📚 API Overview
Main Classes
EtherNetIP
Main class for managing EtherNet/IP communications.
enip = ethernetip.EtherNetIP(ip="192.168.1.100")
Methods:
explicit_conn(ipaddr)- Create an explicit messaging connectionregisterAssembly(iotype, size, inst, conn)- Register I/O assemblystartIO()/stopIO()- Start/stop I/O communicationlistIDUDP(ipaddr, timeout)- Send ListIdentity request via UDP
EtherNetIPExpConnection
Explicit connection for request/response messaging and I/O.
Session Management:
registerSession()- Establish sessionunregisterSession()- Close session
Device Information:
listID()- Get device identitylistServices()- Get available servicesscanNetwork(broadcast, timeout)- Scan for devices
Attribute Services:
getAttrSingle(class, instance, attribute)- Read single attributesetAttrSingle(class, instance, attribute, data)- Write single attributegetAttrAll(class, instance)- Read all attributes
I/O Connection:
sendFwdOpenReq(...)- Open I/O connectionsendFwdCloseReq(...)- Close I/O connectionproduce()- Start producing output datastopProduce()- Stop producing output data
CIP Object IDs
Common CIP object class IDs are provided as constants:
ethernetip.CIP_OBJ_IDENTITY # 0x01 - Device identity
ethernetip.CIP_OBJ_MESSAGE_ROUTER # 0x02 - Message router
ethernetip.CIP_OBJ_ASSEMBLY # 0x04 - Assembly object
ethernetip.CIP_OBJ_CONNECTION # 0x05 - Connection object
ethernetip.CIP_OBJ_TCPIP # 0xF5 - TCP/IP interface
ethernetip.CIP_OBJ_ETHERNET_LINK # 0xF6 - Ethernet link
# ... and more
⚙️ Configuration
Global Configuration
Customize default behavior using the global config object:
from ethernetip import config
# Adjust timeouts
config.DEFAULT_TIMEOUT = 15 # seconds
config.IO_SOCKET_SELECT_TIMEOUT = 3 # seconds
# Adjust buffer sizes
config.RECV_BUFFER_SIZE = 2048 # bytes
config.RECV_BUFFER_SIZE_LARGE = 8192 # bytes
# Adjust I/O timing
config.UDP_IO_RPI_DEFAULT = 50 # ms (faster I/O updates)
config.UDP_IO_MIN_RPI = 5 # ms
Custom Configuration Class
For application-specific settings:
from ethernetip import EtherNetIPConfig
class ProductionConfig(EtherNetIPConfig):
"""Optimized for production environment"""
DEFAULT_TIMEOUT = 30
UDP_IO_RPI_DEFAULT = 100
RECV_BUFFER_SIZE = 4096
# Apply configuration
import ethernetip
ethernetip.config = ProductionConfig()
Available Configuration Options
| Parameter | Default | Description |
|---|---|---|
DEFAULT_TIMEOUT |
10 | Default socket timeout (seconds) |
IO_SOCKET_SELECT_TIMEOUT |
2 | I/O socket select timeout (seconds) |
RECV_BUFFER_SIZE |
1024 | Standard receive buffer size (bytes) |
RECV_BUFFER_SIZE_LARGE |
4096 | Large receive buffer size (bytes) |
UDP_IO_RPI_DEFAULT |
100 | Default I/O update rate (milliseconds) |
UDP_IO_MIN_RPI |
8 | Minimum I/O update rate (milliseconds) |
FWD_OPEN_RPI_MULTIPLIER |
1000 | RPI conversion multiplier (ms to µs) |
TICK_TIME |
250 | Timeout ticks for unconnected send |
📖 Examples
Check out the examples/ directory for more detailed examples:
- example1.py - Complete workflow: discovery, configuration, I/O
- dlrscanner.py - Network scanning example
🛠️ Development
This library uses nox to trigger tests, doc generation and code style check.
make nox
Code Style
This project follows PEP 8 with some modifications (see setup.cfg):
Building Documentation
make docs
🤝 Contributing
Contributions are welcome! Here's how you can help:
- Fork the repository
- Create a feature branch (
git checkout -b feature/amazing-feature) - Commit your changes (
git commit -m 'Add amazing feature') - Push to the branch (
git push origin feature/amazing-feature) - Open a Pull Request
Guidelines
- Follow the existing code style
- Add tests for new features
- Update documentation as needed
- Ensure all tests pass before submitting
📝 License
This project is licensed under the MIT License - see the LICENSE file for details.
🔗 Resources
- EtherNet/IP Specification: ODVA Website
- CIP Common Services: ODVA CIP Documentation
- Issue Tracker: Codeberg Issues
Version History
See CHANGELOG.md for complete version history.
Made with ❤️ for Industrial Automation
If you find this library useful, please consider giving it a ⭐ on Codeberg!