Python Integration
This guide shows you how to integrate IPBot into your Python applications for geolocation, security, and personalization.
Installation
Section titled “Installation”IPBot is a REST API, so no special package is required. However, we recommend using the requests library:
pip install requestsFor async applications, install aiohttp:
pip install aiohttpBasic Usage
Section titled “Basic Usage”Simple Request
Section titled “Simple Request”import requests
def get_ip_info(ip=None): """Get IP information from IPBot""" url = f"https://api.ipbot.com/{ip}" if ip else "https://api.ipbot.com/" response = requests.get(url) response.raise_for_status() return response.json()
# Get caller's IP infoinfo = get_ip_info()print(f"Country: {info['location']['country']}")print(f"City: {info['location']['city']}")
# Lookup specific IPinfo = get_ip_info("8.8.8.8")print(f"Organization: {info['network']['org']}")Django Integration
Section titled “Django Integration”Add to your Django project’s settings or utilities:
import requestsfrom django.core.cache import cachefrom typing import Optional, Dict, Any
class IPBotClient: """IPBot API client for Django"""
API_URL = "https://api.ipbot.com" CACHE_TIMEOUT = 86400 # 24 hours
@staticmethod def get_client_ip(request) -> str: """Extract client IP from Django request""" x_forwarded_for = request.META.get('HTTP_X_FORWARDED_FOR') if x_forwarded_for: ip = x_forwarded_for.split(',')[0].strip() else: ip = request.META.get('REMOTE_ADDR') return ip or 'unknown'
def get_info(self, ip: Optional[str] = None) -> Dict[str, Any]: """Get IP info with caching""" if not ip: raise ValueError("IP address is required")
cache_key = f"ipbot_{ip}" cached_data = cache.get(cache_key)
if cached_data: return cached_data
try: response = requests.get(f"{self.API_URL}/{ip}", timeout=5) response.raise_for_status() data = response.json()
# Cache the response cache.set(cache_key, data, self.CACHE_TIMEOUT) return data
except requests.RequestException as e: return {"error": str(e)}
def get_security_score(self, ip: str) -> int: """Get just the risk score for an IP""" info = self.get_info(ip) return info.get("security", {}).get("risk_score", 0)
# Singleton instanceipbot = IPBotClient()Django View Example
Section titled “Django View Example”from django.shortcuts import renderfrom django.http import JsonResponse, HttpResponseForbiddenfrom .utils.ipbot import ipbot
def home(request): """Home page with location-based content""" client_ip = ipbot.get_client_ip(request) ip_info = ipbot.get_info(client_ip)
context = { "country": ip_info.get("location", {}).get("country", "Unknown"), "country_code": ip_info.get("location", {}).get("country_code", ""), "city": ip_info.get("location", {}).get("city", "Unknown"), "risk_score": ip_info.get("security", {}).get("risk_score", 0), }
return render(request, "home.html", context)
def api_ip_info(request): """API endpoint to get caller's IP info""" client_ip = ipbot.get_client_ip(request) ip_info = ipbot.get_info(client_ip) return JsonResponse(ip_info)
@login_requireddef protected_view(request): """Example: Block high-risk IPs from protected areas""" client_ip = ipbot.get_client_ip(request) ip_info = ipbot.get_info(client_ip)
risk_score = ip_info.get("security", {}).get("risk_score", 0)
if risk_score > 70: return HttpResponseForbidden( "Access denied. Your IP has been flagged as high-risk." )
# ... rest of view logicDjango Middleware
Section titled “Django Middleware”from django.http import HttpResponseForbiddenfrom .utils.ipbot import ipbot
class IPBotSecurityMiddleware: """Middleware to block high-risk IPs"""
def __init__(self, get_response): self.get_response = get_response # IP ranges to always allow (e.g., office IPs) self.allowed_ips = [] # Add your trusted IPs here
def __call__(self, request): # Skip for static files and admin if request.path.startswith('/static/') or request.path.startswith('/admin/'): return self.get_response(request)
client_ip = ipbot.get_client_ip(request)
# Skip if IP is in allowed list if client_ip in self.allowed_ips: return self.get_response(request)
ip_info = ipbot.get_info(client_ip)
if ip_info.get("error"): # Allow on API errors to avoid blocking legitimate users return self.get_response(request)
# Check risk score risk_score = ip_info.get("security", {}).get("risk_score", 0)
if risk_score > 70: return HttpResponseForbidden( "Access denied. Your IP has been flagged as high-risk." )
# Check for proxy/VPN if ip_info.get("security", {}).get("is_proxy"): # You might want to log this or handle differently pass
# Add IP info to request for use in views request.ipbot_info = ip_info
return self.get_response(request)Add to your settings:
MIDDLEWARE = [ # ... 'django.middleware.security.SecurityMiddleware', 'yourapp.middleware.ipbot_middleware.IPBotSecurityMiddleware', # ...]Flask Integration
Section titled “Flask Integration”Basic Flask Usage
Section titled “Basic Flask Usage”from flask import Flask, request, jsonify, render_template_stringimport requestsfrom functools import wraps
app = Flask(__name__)
def get_ip_info(ip=None): """Get IP information from IPBot""" url = f"https://api.ipbot.com/{ip}" if ip else "https://api.ipbot.com/" try: response = requests.get(url, timeout=2) response.raise_for_status() return response.json() except requests.RequestException: return None
def get_client_ip(): """Get client IP from Flask request""" if request.headers.get('X-Forwarded-For'): return request.headers.get('X-Forwarded-For').split(',')[0].strip() return request.remote_addr
def block_high_risk(f): """Decorator to block high-risk IPs""" @wraps(f) def decorated_function(*args, **kwargs): client_ip = get_client_ip() ip_info = get_ip_info(client_ip)
if ip_info and ip_info.get('security', {}).get('risk_score', 0) > 70: return jsonify({"error": "High risk IP detected"}), 403
return f(*args, **kwargs) return decorated_function
@app.route('/')def home(): client_ip = get_client_ip() ip_info = get_ip_info(client_ip)
country = ip_info.get('location', {}).get('country', 'Unknown') if ip_info else 'Unknown'
return render_template_string(f""" <h1>Hello from {country}!</h1> <p>Your IP: {client_ip}</p> """)
@app.route('/api/protected')@block_high_riskdef protected(): return jsonify({"message": "Access granted"})
@app.route('/api/ip-info')def ip_info(): client_ip = get_client_ip() info = get_ip_info(client_ip) return jsonify(info or {"error": "Failed to get IP info"})
if __name__ == '__main__': app.run(debug=True)Flask with Caching
Section titled “Flask with Caching”from flask import Flaskimport requestsfrom cachetools import TTLCacheimport hashlib
app = Flask(__name__)
# Cache with 24-hour TTLip_cache = TTLCache(maxsize=10000, ttl=86400)
def get_ip_info_cached(ip): """Get IP info with caching""" cache_key = hashlib.md5(ip.encode()).hexdigest()
if cache_key in ip_cache: return ip_cache[cache_key]
url = f"https://api.ipbot.com/{ip}" response = requests.get(url, timeout=2) data = response.json()
ip_cache[cache_key] = data return dataFastAPI Integration
Section titled “FastAPI Integration”from fastapi import FastAPI, Request, HTTPException, Dependsfrom fastapi.responses import JSONResponseimport httpxfrom typing import Optionalfrom pydantic import BaseModel
app = FastAPI()
class IPInfo(BaseModel): ip: str country: str city: str risk_score: int is_proxy: bool
async def get_ip_info(ip: str) -> dict: """Get IP info using async httpx""" async with httpx.AsyncClient() as client: response = await client.get(f"https://api.ipbot.com/{ip}", timeout=5.0) response.raise_for_status() return response.json()
async def get_client_ip(request: Request) -> str: """Extract client IP from request""" forwarded = request.headers.get("X-Forwarded-For") if forwarded: return forwarded.split(",")[0].strip() return request.client.host if request.client else "unknown"
@app.get("/")async def root(request: Request): """Home endpoint with location info""" client_ip = await get_client_ip(request) ip_info = await get_ip_info(client_ip)
return { "message": f"Hello from {ip_info.get('location', {}).get('country', 'Unknown')}!", "ip": client_ip, "location": ip_info.get("location", {}) }
@app.get("/api/ip-info")async def api_ip_info(request: Request): """Get caller's IP information""" client_ip = await get_client_ip(request) return await get_ip_info(client_ip)
@app.get("/api/lookup/{ip}")async def lookup_ip(ip: str): """Lookup a specific IP address""" try: return await get_ip_info(ip) except httpx.HTTPStatusError as e: raise HTTPException(status_code=e.response.status_code, detail="Failed to lookup IP")
@app.post("/api/secure-action")async def secure_action(request: Request): """Example endpoint that blocks high-risk IPs""" client_ip = await get_client_ip(request) ip_info = await get_ip_info(client_ip)
risk_score = ip_info.get("security", {}).get("risk_score", 0)
if risk_score > 50: raise HTTPException( status_code=403, detail=f"High risk IP detected (score: {risk_score})" )
return {"message": "Action completed successfully", "risk_score": risk_score}
# Middleware for IP-based blocking@app.middleware("http")async def add_ip_info(request: Request, call_next): """Add IP info to request state""" client_ip = await get_client_ip(request)
try: ip_info = await get_ip_info(client_ip) request.state.ip_info = ip_info request.state.client_ip = client_ip
# Block high-risk IPs globally risk_score = ip_info.get("security", {}).get("risk_score", 0) if risk_score > 80: return JSONResponse( status_code=403, content={"error": "Access denied due to high risk score"} ) except Exception: # Continue on error to avoid blocking all traffic pass
response = await call_next(request) return responseAsync Python (aiohttp)
Section titled “Async Python (aiohttp)”import aiohttpfrom typing import Dict, Any, Optionalimport asyncio
class AsyncIPBotClient: """Async IPBot client"""
def __init__(self, session: Optional[aiohttp.ClientSession] = None): self.session = session self.api_url = "https://api.ipbot.com"
async def get_info(self, ip: Optional[str] = None) -> Dict[str, Any]: """Get IP info asynchronously""" if self.session is None: self.session = aiohttp.ClientSession() close_session = True else: close_session = False
try: url = f"{self.api_url}/{ip}" if ip else self.api_url async with self.session.get(url) as response: response.raise_for_status() return await response.json() finally: if close_session: await self.session.close()
async def get_multiple(self, ips: list[str]) -> list[Dict[str, Any]]: """Get info for multiple IPs concurrently""" tasks = [self.get_info(ip) for ip in ips] return await asyncio.gather(*tasks)
# Usageasync def main(): client = AsyncIPBotClient()
# Single lookup info = await client.get_info("8.8.8.8") print(f"Country: {info['location']['country']}")
# Multiple lookups ips = ["8.8.8.8", "1.1.1.1", "208.67.222.222"] results = await client.get_multiple(ips)
for ip, data in zip(ips, results): print(f"{ip}: {data['location']['country']}")
asyncio.run(main())CLI Tool
Section titled “CLI Tool”Create a command-line tool for IP lookups:
#!/usr/bin/env python3"""ipbot-cli - Command line tool for IPBot API"""
import argparseimport jsonimport sysimport requests
def format_output(data, format_type="json"): """Format output for display""" if format_type == "json": return json.dumps(data, indent=2) elif format_type == "pretty": location = data.get("location", {}) security = data.get("security", {}) network = data.get("network", {})
output = [ f"IP: {data.get('ip', 'Unknown')}", f"Location: {location.get('city', 'Unknown')}, {location.get('region', 'Unknown')}, {location.get('country', 'Unknown')}", f"Organization: {network.get('org', 'Unknown')}", f"ASN: {network.get('asn', 'Unknown')}", f"Risk Score: {security.get('risk_score', 0)}/100", f"Threat Level: {security.get('threat_level', 'Unknown')}", f"Is Proxy: {security.get('is_proxy', False)}", f"Is Datacenter: {security.get('is_datacenter', False)}", ] return "\n".join(output) return str(data)
def main(): parser = argparse.ArgumentParser(description="IPBot CLI Tool") parser.add_argument("ip", nargs="?", help="IP address to lookup (defaults to your IP)") parser.add_argument("-f", "--format", choices=["json", "pretty"], default="pretty", help="Output format") parser.add_argument("-r", "--risk-only", action="store_true", help="Show only risk information")
args = parser.parse_args()
url = f"https://api.ipbot.com/{args.ip}" if args.ip else "https://api.ipbot.com/"
try: response = requests.get(url) response.raise_for_status() data = response.json()
if args.risk_only: security = data.get("security", {}) output_data = { "ip": data.get("ip"), "risk_score": security.get("risk_score"), "risk_reasons": security.get("risk_reasons"), "threat_level": security.get("threat_level"), "is_proxy": security.get("is_proxy"), "is_datacenter": security.get("is_datacenter"), } else: output_data = data
print(format_output(output_data, args.format))
except requests.RequestException as e: print(f"Error: {e}", file=sys.stderr) sys.exit(1)
if __name__ == "__main__": main()Error Handling Best Practices
Section titled “Error Handling Best Practices”import requestsfrom requests.adapters import HTTPAdapterfrom urllib3.util.retry import Retry
def create_session(): """Create a requests session with retry logic""" session = requests.Session()
retry = Retry( total=3, backoff_factor=0.5, status_forcelist=[500, 502, 503, 504], )
adapter = HTTPAdapter(max_retries=retry) session.mount('http://', adapter) session.mount('https://', adapter)
return session
def get_ip_info_safe(ip: str) -> dict: """Get IP info with comprehensive error handling""" session = create_session()
try: response = session.get( f"https://api.ipbot.com/{ip}", timeout=5 ) response.raise_for_status() return response.json()
except requests.Timeout: return {"error": "Request timed out", "code": "TIMEOUT"} except requests.ConnectionError: return {"error": "Connection failed", "code": "CONNECTION_ERROR"} except requests.HTTPError as e: return { "error": f"HTTP error: {e.response.status_code}", "code": f"HTTP_{e.response.status_code}" } except requests.RequestException as e: return {"error": str(e), "code": "REQUEST_ERROR"} except ValueError: return {"error": "Invalid JSON response", "code": "INVALID_JSON"}