``` @property
def additional_filters(self):
fltr = None
if self.filter:
filter_filepath = FILTERS_DIR / self.filter
filter_file = open(filter_filepath, "r")
fltr = self.filter_format(filter_file.read(), self.current_version)
return fltr
@property
def http_headers(self) -> dict:
"""Return the http headers needed."""
headers = {}
if "user_agent" in self.config:
headers["User-Agent"] = self.config.get("user_agent")
headers["Content-Type"] = "application/xml"
return headers
def get_next_page_token(
self, response: requests.Response, previous_token: Optional[Any]
) -> Optional[Any]:
return None
def get_url_params(
self, context: Optional[dict], next_page_token: Optional[Any]
) -> Dict[str, Any]:
"""Return a dictionary of values to be used in URL parameterization."""
params: dict = {}
if next_page_token:
params["page"] = next_page_token
if self.replication_key:
params["sort"] = "asc"
params["order_by"] = self.replication_key
return params
# This is overridden from the RESTStream class since it assumes Json; needed to change
# the body parameters to accept a "body" vs "json" dictionary
def prepare_request(
self, context: Optional[dict], next_page_token: Optional[Any]
) -> requests.PreparedRequest:
http_method = self.rest_method
url: str = self.get_url(context)
params: dict = self.get_url_params(context, next_page_token)
request_data = self.prepare_request_payload(context, next_page_token)
headers = self.http_headers
authenticator = self.authenticator
if authenticator:
headers.update(authenticator.auth_headers or {})
request = cast(
requests.PreparedRequest,
self.requests_session.prepare_request(
requests.Request(
method=http_method,
url=url,
params=params,
headers=headers,
data=request_data,
)
),
)
return request
def prepare_request_payload(
self, context: Optional[dict], next_page_token: Optional[Any]
) -> Optional[dict]:
# Adaptive, for our purposes, uses the same endpoint with slightly different body for each stream
# the body includes credentials, a method, and optionally filters.
body = """
<call method="{method}" callerName="meltano">
<credentials login="{username}" password="{password}" instanceCode="{instanceCode}"/>
{filters}
</call>
""".format(method=self.method,
filters=self.additional_filters,
username=self.username,
password=self.password,
instanceCode=self.instance_code)
return body
def parse_response(self, response: requests.Response) -> Iterable[dict]:
"""Parse the response and return an iterator of result rows."""
yield from self.extract_xml(response.text)
def extract_xml(self, response_text: str) -> Generator[Any, None, None]:
root = ET.fromstring(response_text)
if root.get('success') == "true":
if self.method == "exportVersions":
for record in self.make_version_records(list(root[0][0])):
yield record
else:
for record in self.parse_worksheet_data(list(root[0].text.splitlines())):
yield record
else:
self.logger.error("something went wrong")
self.logger.error(response_text)
def make_version_records(self, lines: list):
data = []
for version in lines:
data.append({"id": version.get("id")…