98 lines
3.1 KiB
Python
98 lines
3.1 KiB
Python
|
# just a highly simplified wrapper over pysnmp
|
||
|
|
||
|
from pysnmp.hlapi import NoSuchInstance, Integer, Integer32, Counter32, OctetString, ObjectType, ObjectIdentity, getCmd, nextCmd, SnmpEngine, CommunityData, UdpTransportTarget, ContextData
|
||
|
|
||
|
# for bulk requests. I find large requests crash the ilo (lol)
|
||
|
MAX_CHUNK = 64
|
||
|
|
||
|
|
||
|
class SnmpConfiguration(object):
|
||
|
def __init__(self, engine: SnmpEngine, auth: CommunityData, transport: UdpTransportTarget, context: ContextData):
|
||
|
self.engine = engine
|
||
|
self.auth = auth
|
||
|
self.transport = transport
|
||
|
self.context = context
|
||
|
|
||
|
|
||
|
class AgentError(Exception):
|
||
|
pass
|
||
|
|
||
|
|
||
|
class EngineError(Exception):
|
||
|
pass
|
||
|
|
||
|
|
||
|
def process_value(var_bind) -> str | int | float | None:
|
||
|
val = var_bind[1]
|
||
|
if isinstance(val, NoSuchInstance):
|
||
|
return None
|
||
|
elif isinstance(val, Integer) or isinstance(val, Integer32) or isinstance(val, Counter32):
|
||
|
return int(val)
|
||
|
elif isinstance(val, OctetString):
|
||
|
return str(val)
|
||
|
else:
|
||
|
print('i dunno:', val)
|
||
|
print('unhandled type:', type(val))
|
||
|
return val.prettyPrint()
|
||
|
|
||
|
|
||
|
def snmp_get(c: SnmpConfiguration, oid: str | tuple[int]) -> str | int | float | None:
|
||
|
""" gets a single oid """
|
||
|
return snmp_get_all(c, oid)[0]
|
||
|
|
||
|
|
||
|
def snmp_get_all(c: SnmpConfiguration, *oid: str | tuple[int]) -> list[str | int | float | None]:
|
||
|
""" does a bulk request """
|
||
|
if len(oid) > MAX_CHUNK:
|
||
|
# split it up to not break the target
|
||
|
results = []
|
||
|
results.extend(snmp_get_all(c, *oid[:MAX_CHUNK]))
|
||
|
results.extend(snmp_get_all(c, *oid[MAX_CHUNK:]))
|
||
|
return results
|
||
|
|
||
|
# do snmp get
|
||
|
it = getCmd(c.engine, c.auth, c.transport, c.context, *[ObjectType(ObjectIdentity(x)) for x in oid])
|
||
|
engine_err, agent_err, agent_err_index, var_binds = next(it)
|
||
|
|
||
|
# handle errors
|
||
|
if engine_err:
|
||
|
raise EngineError(engine_err)
|
||
|
elif agent_err:
|
||
|
raise AgentError('%s at %s' % (agent_err.prettyPrint(), var_binds[int(agent_err_index) - 1] if agent_err_index else '?'))
|
||
|
|
||
|
# debugging
|
||
|
# for var_bind in var_binds:
|
||
|
# print('got snmp:', ' = '.join([x.prettyPrint() for x in var_bind]))
|
||
|
|
||
|
return [process_value(vb) for vb in var_binds]
|
||
|
|
||
|
|
||
|
def snmp_walk(c: SnmpConfiguration, base_oid: str) -> list[tuple[str, str | int | float | None]]:
|
||
|
""" does a walk within the range of a specified base oid """
|
||
|
results = []
|
||
|
|
||
|
# do snmp get
|
||
|
it = nextCmd(c.engine, c.auth, c.transport, c.context, ObjectType(ObjectIdentity(base_oid)))
|
||
|
within = True
|
||
|
while within:
|
||
|
engine_err, agent_err, agent_err_index, var_binds = next(it)
|
||
|
|
||
|
# handle errors
|
||
|
if engine_err:
|
||
|
raise EngineError(engine_err)
|
||
|
elif agent_err:
|
||
|
raise AgentError('%s at %s' % (agent_err.prettyPrint(), var_binds[int(agent_err_index) - 1] if agent_err_index else '?'))
|
||
|
|
||
|
for var_bind in var_binds:
|
||
|
# print(var_bind)
|
||
|
oid = str(var_bind[0].getOid())
|
||
|
if oid.startswith(base_oid):
|
||
|
results.append((oid, process_value(var_bind)))
|
||
|
else:
|
||
|
within = False
|
||
|
|
||
|
if len(var_binds) == 0:
|
||
|
within = False
|
||
|
|
||
|
return results
|