ecourts.ecourt
1import os 2import requests 3from captcha import Captcha, CaptchaError 4from collections.abc import Iterator 5from tempfile import mkstemp 6import time 7from urllib.parse import urlencode 8from entities import Court, CaseType, Case, Hearing, Order, ActType, CauseList 9from entities.hearing import UnexpandableHearing 10import datetime 11import csv 12from parsers.orders import parse_orders 13from parsers.options import parse_options 14from parsers.cases import parse_cases 15from parsers.cause_lists import parse_cause_lists 16 17 18class RetryException(Exception): 19 pass 20 21 22class ECourt: 23 # TODO: Get this dynamically at init 24 CSRF_MAGIC_PARAMS = { 25 "__csrf_magic": "sid:e2b2b2ae5e125f3174066a01e182acd472a256ea,1723269931" 26 } 27 BASE_URL = "https://hcservices.ecourts.gov.in/ecourtindiaHC" 28 29 def __init__(self, court: Court): 30 self.session = requests.Session() 31 self.court = court 32 self.captcha = Captcha(self.session) 33 self.max_attempts = 15 34 35 def set_max_attempts(self, attempts): 36 self.max_attempts = attempts 37 38 def attempts(self): 39 return self.max_attempts 40 41 def url(self, path, queryParams={}): 42 if len(queryParams) > 0: 43 return self.BASE_URL + path + "?" + urlencode(queryParams) 44 return self.BASE_URL + path 45 46 def validate_response(self, r): 47 t = r.text.upper()[0:30] 48 if "ERROR" in t: 49 raise ValueError("Got invalid result") 50 if "INVALID CAPTCHA" in t: 51 raise CaptchaError() 52 53 def apimethod(path, court=False, csrf=True, action=None): 54 def decorator(func): 55 def inner(self, *args, **kwargs): 56 params = {"action_code": action} if action else {} 57 if court: 58 params |= self.court.queryParams() 59 if csrf: 60 params |= self.CSRF_MAGIC_PARAMS 61 62 attempts = self.attempts() 63 64 while attempts > 0: 65 try: 66 extra_params = func(self, *args, **kwargs) or {} 67 if len(extra_params) == 0: 68 params |= kwargs 69 else: 70 params |= extra_params 71 if 'captcha' in params and params['captcha'] == None: 72 print("Ran out captcha attempts") 73 sys.exit(1) 74 75 response = self.session.post(self.url(path), data=params, allow_redirects=False, timeout=(5, 10)) 76 self.validate_response(response) 77 if response.status_code == 302 and response.headers['location'].startswith("errormsg"): 78 raise ValueError("Error: " + response.headers['location']) 79 response.raise_for_status() 80 attempts = 0 81 except (CaptchaError, ValueError, requests.exceptions.Timeout, requests.exceptions.ConnectionError) as e: 82 time.sleep(1) 83 attempts -= 1 84 if attempts == 0: 85 raise RetryException(f"Ran out of {self.attempts()} attempts, still failed") 86 except (requests.exceptions.HTTPError) as e: 87 time.sleep(1) 88 attempts -= 1 89 if attempts == 0: 90 raise RetryException(f"Ran out of {self.attempts()} attempts, still failed") 91 92 response.encoding = "utf-8-sig" 93 return response.text 94 95 return inner 96 97 return decorator 98 99 @apimethod( 100 path="/cases/s_orderdate_qry.php", court=True, csrf=True, action="showRecords" 101 ) 102 def _get_orders(self, *args, **kwargs): 103 return { 104 "captcha": self.captcha.solve(), 105 } 106 107 @apimethod( 108 path="/cases/s_show_business.php", court=True, csrf=False, action=None 109 ) 110 def _get_hearing_details(self, *args, **kwargs): 111 pass 112 113 def expandHearing(self, hearing: Hearing, case: Case): 114 """ 115 Expand a hearing object with more details from the daily business list 116 """ 117 if case.case_number == None: 118 raise ValueError("Require a case.case_number to expand hearing details") 119 if hearing.court_no == None or hearing.srno == None or hearing.date == None: 120 raise UnexpandableHearing() 121 params = { 122 "case_number1": case.case_number, 123 } | hearing.expandParams() 124 from parsers.hearing_details import parse_hearing_details 125 hearing.details = parse_hearing_details(self._get_hearing_details(**params)) 126 127 def downloadOrder(self, order: Order, court_case: Case, filename: str): 128 # display_pdf.php?filename, caseno=AB/3142/2018 | cCode=1 | appFlag= | cino=GAHC010225502018 |state_code=6 129 assert order.filename != None 130 assert court_case.case_type != None 131 assert court_case.registration_number != None 132 assert court_case.cnr_number != None 133 queryParams = { 134 "filename": order.filename, 135 "caseno": f"{court_case.case_type}/{court_case.registration_number}", 136 "cCode": self.court.court_code or "1", 137 "state_code": self.court.state_code, 138 "cino": court_case.cnr_number, 139 } 140 url = self.url("/cases/display_pdf.php", queryParams) 141 r = self.session.get(url) 142 with open(filename, "wb") as f: 143 f.write(r.content) 144 145 # Search for cases by Case Type | 🚧WIP | Case Type, Year†, Pending/Disposed 146 @apimethod( 147 path="/cases/s_casetype_qry.php", action="showRecords", court=True 148 ) 149 def _search_cases_by_case_type(self, case_type, status, search_year, **kwargs): 150 assert status in ["Pending", "Disposed"] 151 152 r = { 153 "captcha": self.captcha.solve(), 154 "f": status, 155 "case_type": str(case_type) 156 } 157 if search_year: 158 r["search_year"] = search_year 159 return r 160 161 def CaseType(self, case_type: str, status: str, year: int = None): 162 cc = self.court.court_code or "1" 163 url = self.url(f"/cases/s_casetype.php?state_cd={self.court.state_code}&dist_cd=1&court_code={cc}") 164 self.session.get(url) 165 result = self._search_cases_by_case_type(case_type, status, year) 166 return parse_cases(result) 167 168 # Search for cases by Act Type 169 @apimethod( 170 path="/cases/s_actwise_qry.php", action="showRecords", court=True, csrf=True 171 ) 172 def _search_cases_by_act_type(self, act_type: str, status: str, **kwargs): 173 """ 174 Search a specific ecourt for cases by act type under 175 which they were registered. Requires a act type 176 """ 177 178 return { 179 "captcha": self.captcha.solve(), 180 "actcode": act_type, 181 "f": status 182 } 183 184 def ActType(self, act_type: str, status: str): 185 result = self._search_cases_by_act_type(act_type, status) 186 return parse_cases(result) 187 188 @apimethod(path="/cases/o_civil_case_history.php", court=True, action=None, csrf=False) 189 def getCaseHistory(self, case: Case, **kwargs): 190 return case.expandParams() 191 192 # TODO: Store the case_type if available inside the case itself 193 @apimethod(path="/cases/case_no_qry.php", action="showRecords", court=True) 194 def searchSingleCase(self, registration_number: str, case_type: str): 195 return { 196 "captcha": self.captcha.solve(), 197 "case_type": case_type, 198 "case_no": registration_number.split("/")[0], 199 "rgyear": registration_number.split("/")[1], 200 "caseNoType": "new", 201 "displayOldCaseNo": "NO" 202 } 203 204 def expand_case(self, case: Case): 205 from parsers.case_details import CaseDetails 206 html = self.getCaseHistory(case) 207 newcase = CaseDetails(html).case 208 if case.case_number: 209 newcase.case_number = case.case_number 210 return newcase 211 212 def getOrdersOnDate(self, date: datetime.date): 213 d = date.strftime("%d-%m-%Y") 214 return parse_orders(self._get_orders(from_date=d, to_date=d)) 215 216 def getCaseTypes(self) -> Iterator[CaseType]: 217 for option in parse_options(self._get_case_type())[1:]: 218 yield CaseType(code=int(option[0]), description=option[1], court=self.court) 219 220 @apimethod( 221 path="/cases/s_casetype_qry.php", csrf=True, court=True, action="fillCaseType" 222 ) 223 def _get_case_type(self, *args, **kwargs): 224 pass 225 226 @apimethod( 227 path="/cases/s_actwise_qry.php", csrf=False, court=True, action="fillActType" 228 ) 229 def _get_act_type(self, query: str, **kwargs): 230 return { 231 "search_act": query 232 } 233 234 def getActTypes(self, query="") -> Iterator[ActType]: 235 for option in parse_options(self._get_act_type(query))[1:]: 236 yield ActType(code=int(option[0]), description=option[1], court=self.court) 237 238 def getCauseLists(self, date: datetime.date) -> Iterator[CauseList]: 239 raw_res = self._get_cause_lists(date) 240 return parse_cause_lists(raw_res) 241 242 @apimethod( 243 path="/cases/highcourt_causelist_qry.php", 244 court=True, 245 action="pulishedCauselist", 246 csrf=False 247 ) 248 def _get_cause_lists(self, date: datetime.date, **kwargs): 249 dt_str = date.strftime("%d-%m-%Y") 250 return { 251 "causelist_dt": dt_str, 252 }
class
RetryException(builtins.Exception):
Common base class for all non-exit exceptions.
class
ECourt:
23class ECourt: 24 # TODO: Get this dynamically at init 25 CSRF_MAGIC_PARAMS = { 26 "__csrf_magic": "sid:e2b2b2ae5e125f3174066a01e182acd472a256ea,1723269931" 27 } 28 BASE_URL = "https://hcservices.ecourts.gov.in/ecourtindiaHC" 29 30 def __init__(self, court: Court): 31 self.session = requests.Session() 32 self.court = court 33 self.captcha = Captcha(self.session) 34 self.max_attempts = 15 35 36 def set_max_attempts(self, attempts): 37 self.max_attempts = attempts 38 39 def attempts(self): 40 return self.max_attempts 41 42 def url(self, path, queryParams={}): 43 if len(queryParams) > 0: 44 return self.BASE_URL + path + "?" + urlencode(queryParams) 45 return self.BASE_URL + path 46 47 def validate_response(self, r): 48 t = r.text.upper()[0:30] 49 if "ERROR" in t: 50 raise ValueError("Got invalid result") 51 if "INVALID CAPTCHA" in t: 52 raise CaptchaError() 53 54 def apimethod(path, court=False, csrf=True, action=None): 55 def decorator(func): 56 def inner(self, *args, **kwargs): 57 params = {"action_code": action} if action else {} 58 if court: 59 params |= self.court.queryParams() 60 if csrf: 61 params |= self.CSRF_MAGIC_PARAMS 62 63 attempts = self.attempts() 64 65 while attempts > 0: 66 try: 67 extra_params = func(self, *args, **kwargs) or {} 68 if len(extra_params) == 0: 69 params |= kwargs 70 else: 71 params |= extra_params 72 if 'captcha' in params and params['captcha'] == None: 73 print("Ran out captcha attempts") 74 sys.exit(1) 75 76 response = self.session.post(self.url(path), data=params, allow_redirects=False, timeout=(5, 10)) 77 self.validate_response(response) 78 if response.status_code == 302 and response.headers['location'].startswith("errormsg"): 79 raise ValueError("Error: " + response.headers['location']) 80 response.raise_for_status() 81 attempts = 0 82 except (CaptchaError, ValueError, requests.exceptions.Timeout, requests.exceptions.ConnectionError) as e: 83 time.sleep(1) 84 attempts -= 1 85 if attempts == 0: 86 raise RetryException(f"Ran out of {self.attempts()} attempts, still failed") 87 except (requests.exceptions.HTTPError) as e: 88 time.sleep(1) 89 attempts -= 1 90 if attempts == 0: 91 raise RetryException(f"Ran out of {self.attempts()} attempts, still failed") 92 93 response.encoding = "utf-8-sig" 94 return response.text 95 96 return inner 97 98 return decorator 99 100 @apimethod( 101 path="/cases/s_orderdate_qry.php", court=True, csrf=True, action="showRecords" 102 ) 103 def _get_orders(self, *args, **kwargs): 104 return { 105 "captcha": self.captcha.solve(), 106 } 107 108 @apimethod( 109 path="/cases/s_show_business.php", court=True, csrf=False, action=None 110 ) 111 def _get_hearing_details(self, *args, **kwargs): 112 pass 113 114 def expandHearing(self, hearing: Hearing, case: Case): 115 """ 116 Expand a hearing object with more details from the daily business list 117 """ 118 if case.case_number == None: 119 raise ValueError("Require a case.case_number to expand hearing details") 120 if hearing.court_no == None or hearing.srno == None or hearing.date == None: 121 raise UnexpandableHearing() 122 params = { 123 "case_number1": case.case_number, 124 } | hearing.expandParams() 125 from parsers.hearing_details import parse_hearing_details 126 hearing.details = parse_hearing_details(self._get_hearing_details(**params)) 127 128 def downloadOrder(self, order: Order, court_case: Case, filename: str): 129 # display_pdf.php?filename, caseno=AB/3142/2018 | cCode=1 | appFlag= | cino=GAHC010225502018 |state_code=6 130 assert order.filename != None 131 assert court_case.case_type != None 132 assert court_case.registration_number != None 133 assert court_case.cnr_number != None 134 queryParams = { 135 "filename": order.filename, 136 "caseno": f"{court_case.case_type}/{court_case.registration_number}", 137 "cCode": self.court.court_code or "1", 138 "state_code": self.court.state_code, 139 "cino": court_case.cnr_number, 140 } 141 url = self.url("/cases/display_pdf.php", queryParams) 142 r = self.session.get(url) 143 with open(filename, "wb") as f: 144 f.write(r.content) 145 146 # Search for cases by Case Type | 🚧WIP | Case Type, Year†, Pending/Disposed 147 @apimethod( 148 path="/cases/s_casetype_qry.php", action="showRecords", court=True 149 ) 150 def _search_cases_by_case_type(self, case_type, status, search_year, **kwargs): 151 assert status in ["Pending", "Disposed"] 152 153 r = { 154 "captcha": self.captcha.solve(), 155 "f": status, 156 "case_type": str(case_type) 157 } 158 if search_year: 159 r["search_year"] = search_year 160 return r 161 162 def CaseType(self, case_type: str, status: str, year: int = None): 163 cc = self.court.court_code or "1" 164 url = self.url(f"/cases/s_casetype.php?state_cd={self.court.state_code}&dist_cd=1&court_code={cc}") 165 self.session.get(url) 166 result = self._search_cases_by_case_type(case_type, status, year) 167 return parse_cases(result) 168 169 # Search for cases by Act Type 170 @apimethod( 171 path="/cases/s_actwise_qry.php", action="showRecords", court=True, csrf=True 172 ) 173 def _search_cases_by_act_type(self, act_type: str, status: str, **kwargs): 174 """ 175 Search a specific ecourt for cases by act type under 176 which they were registered. Requires a act type 177 """ 178 179 return { 180 "captcha": self.captcha.solve(), 181 "actcode": act_type, 182 "f": status 183 } 184 185 def ActType(self, act_type: str, status: str): 186 result = self._search_cases_by_act_type(act_type, status) 187 return parse_cases(result) 188 189 @apimethod(path="/cases/o_civil_case_history.php", court=True, action=None, csrf=False) 190 def getCaseHistory(self, case: Case, **kwargs): 191 return case.expandParams() 192 193 # TODO: Store the case_type if available inside the case itself 194 @apimethod(path="/cases/case_no_qry.php", action="showRecords", court=True) 195 def searchSingleCase(self, registration_number: str, case_type: str): 196 return { 197 "captcha": self.captcha.solve(), 198 "case_type": case_type, 199 "case_no": registration_number.split("/")[0], 200 "rgyear": registration_number.split("/")[1], 201 "caseNoType": "new", 202 "displayOldCaseNo": "NO" 203 } 204 205 def expand_case(self, case: Case): 206 from parsers.case_details import CaseDetails 207 html = self.getCaseHistory(case) 208 newcase = CaseDetails(html).case 209 if case.case_number: 210 newcase.case_number = case.case_number 211 return newcase 212 213 def getOrdersOnDate(self, date: datetime.date): 214 d = date.strftime("%d-%m-%Y") 215 return parse_orders(self._get_orders(from_date=d, to_date=d)) 216 217 def getCaseTypes(self) -> Iterator[CaseType]: 218 for option in parse_options(self._get_case_type())[1:]: 219 yield CaseType(code=int(option[0]), description=option[1], court=self.court) 220 221 @apimethod( 222 path="/cases/s_casetype_qry.php", csrf=True, court=True, action="fillCaseType" 223 ) 224 def _get_case_type(self, *args, **kwargs): 225 pass 226 227 @apimethod( 228 path="/cases/s_actwise_qry.php", csrf=False, court=True, action="fillActType" 229 ) 230 def _get_act_type(self, query: str, **kwargs): 231 return { 232 "search_act": query 233 } 234 235 def getActTypes(self, query="") -> Iterator[ActType]: 236 for option in parse_options(self._get_act_type(query))[1:]: 237 yield ActType(code=int(option[0]), description=option[1], court=self.court) 238 239 def getCauseLists(self, date: datetime.date) -> Iterator[CauseList]: 240 raw_res = self._get_cause_lists(date) 241 return parse_cause_lists(raw_res) 242 243 @apimethod( 244 path="/cases/highcourt_causelist_qry.php", 245 court=True, 246 action="pulishedCauselist", 247 csrf=False 248 ) 249 def _get_cause_lists(self, date: datetime.date, **kwargs): 250 dt_str = date.strftime("%d-%m-%Y") 251 return { 252 "causelist_dt": dt_str, 253 }
def
apimethod(path, court=False, csrf=True, action=None):
54 def apimethod(path, court=False, csrf=True, action=None): 55 def decorator(func): 56 def inner(self, *args, **kwargs): 57 params = {"action_code": action} if action else {} 58 if court: 59 params |= self.court.queryParams() 60 if csrf: 61 params |= self.CSRF_MAGIC_PARAMS 62 63 attempts = self.attempts() 64 65 while attempts > 0: 66 try: 67 extra_params = func(self, *args, **kwargs) or {} 68 if len(extra_params) == 0: 69 params |= kwargs 70 else: 71 params |= extra_params 72 if 'captcha' in params and params['captcha'] == None: 73 print("Ran out captcha attempts") 74 sys.exit(1) 75 76 response = self.session.post(self.url(path), data=params, allow_redirects=False, timeout=(5, 10)) 77 self.validate_response(response) 78 if response.status_code == 302 and response.headers['location'].startswith("errormsg"): 79 raise ValueError("Error: " + response.headers['location']) 80 response.raise_for_status() 81 attempts = 0 82 except (CaptchaError, ValueError, requests.exceptions.Timeout, requests.exceptions.ConnectionError) as e: 83 time.sleep(1) 84 attempts -= 1 85 if attempts == 0: 86 raise RetryException(f"Ran out of {self.attempts()} attempts, still failed") 87 except (requests.exceptions.HTTPError) as e: 88 time.sleep(1) 89 attempts -= 1 90 if attempts == 0: 91 raise RetryException(f"Ran out of {self.attempts()} attempts, still failed") 92 93 response.encoding = "utf-8-sig" 94 return response.text 95 96 return inner 97 98 return decorator
def
expandHearing(self, hearing: entities.hearing.Hearing, case: entities.case.Case):
114 def expandHearing(self, hearing: Hearing, case: Case): 115 """ 116 Expand a hearing object with more details from the daily business list 117 """ 118 if case.case_number == None: 119 raise ValueError("Require a case.case_number to expand hearing details") 120 if hearing.court_no == None or hearing.srno == None or hearing.date == None: 121 raise UnexpandableHearing() 122 params = { 123 "case_number1": case.case_number, 124 } | hearing.expandParams() 125 from parsers.hearing_details import parse_hearing_details 126 hearing.details = parse_hearing_details(self._get_hearing_details(**params))
Expand a hearing object with more details from the daily business list
def
downloadOrder( self, order: entities.order.Order, court_case: entities.case.Case, filename: str):
128 def downloadOrder(self, order: Order, court_case: Case, filename: str): 129 # display_pdf.php?filename, caseno=AB/3142/2018 | cCode=1 | appFlag= | cino=GAHC010225502018 |state_code=6 130 assert order.filename != None 131 assert court_case.case_type != None 132 assert court_case.registration_number != None 133 assert court_case.cnr_number != None 134 queryParams = { 135 "filename": order.filename, 136 "caseno": f"{court_case.case_type}/{court_case.registration_number}", 137 "cCode": self.court.court_code or "1", 138 "state_code": self.court.state_code, 139 "cino": court_case.cnr_number, 140 } 141 url = self.url("/cases/display_pdf.php", queryParams) 142 r = self.session.get(url) 143 with open(filename, "wb") as f: 144 f.write(r.content)
def
CaseType(self, case_type: str, status: str, year: int = None):
162 def CaseType(self, case_type: str, status: str, year: int = None): 163 cc = self.court.court_code or "1" 164 url = self.url(f"/cases/s_casetype.php?state_cd={self.court.state_code}&dist_cd=1&court_code={cc}") 165 self.session.get(url) 166 result = self._search_cases_by_case_type(case_type, status, year) 167 return parse_cases(result)
def
getCaseHistory(self, *args, **kwargs):
56 def inner(self, *args, **kwargs): 57 params = {"action_code": action} if action else {} 58 if court: 59 params |= self.court.queryParams() 60 if csrf: 61 params |= self.CSRF_MAGIC_PARAMS 62 63 attempts = self.attempts() 64 65 while attempts > 0: 66 try: 67 extra_params = func(self, *args, **kwargs) or {} 68 if len(extra_params) == 0: 69 params |= kwargs 70 else: 71 params |= extra_params 72 if 'captcha' in params and params['captcha'] == None: 73 print("Ran out captcha attempts") 74 sys.exit(1) 75 76 response = self.session.post(self.url(path), data=params, allow_redirects=False, timeout=(5, 10)) 77 self.validate_response(response) 78 if response.status_code == 302 and response.headers['location'].startswith("errormsg"): 79 raise ValueError("Error: " + response.headers['location']) 80 response.raise_for_status() 81 attempts = 0 82 except (CaptchaError, ValueError, requests.exceptions.Timeout, requests.exceptions.ConnectionError) as e: 83 time.sleep(1) 84 attempts -= 1 85 if attempts == 0: 86 raise RetryException(f"Ran out of {self.attempts()} attempts, still failed") 87 except (requests.exceptions.HTTPError) as e: 88 time.sleep(1) 89 attempts -= 1 90 if attempts == 0: 91 raise RetryException(f"Ran out of {self.attempts()} attempts, still failed") 92 93 response.encoding = "utf-8-sig" 94 return response.text
def
searchSingleCase(self, *args, **kwargs):
56 def inner(self, *args, **kwargs): 57 params = {"action_code": action} if action else {} 58 if court: 59 params |= self.court.queryParams() 60 if csrf: 61 params |= self.CSRF_MAGIC_PARAMS 62 63 attempts = self.attempts() 64 65 while attempts > 0: 66 try: 67 extra_params = func(self, *args, **kwargs) or {} 68 if len(extra_params) == 0: 69 params |= kwargs 70 else: 71 params |= extra_params 72 if 'captcha' in params and params['captcha'] == None: 73 print("Ran out captcha attempts") 74 sys.exit(1) 75 76 response = self.session.post(self.url(path), data=params, allow_redirects=False, timeout=(5, 10)) 77 self.validate_response(response) 78 if response.status_code == 302 and response.headers['location'].startswith("errormsg"): 79 raise ValueError("Error: " + response.headers['location']) 80 response.raise_for_status() 81 attempts = 0 82 except (CaptchaError, ValueError, requests.exceptions.Timeout, requests.exceptions.ConnectionError) as e: 83 time.sleep(1) 84 attempts -= 1 85 if attempts == 0: 86 raise RetryException(f"Ran out of {self.attempts()} attempts, still failed") 87 except (requests.exceptions.HTTPError) as e: 88 time.sleep(1) 89 attempts -= 1 90 if attempts == 0: 91 raise RetryException(f"Ran out of {self.attempts()} attempts, still failed") 92 93 response.encoding = "utf-8-sig" 94 return response.text