Source code for sequenza.izip

from heapq import merge


[docs]class zip_coordinates: ''' Merge two object that have coordinate chromosome/position. The format of the objects must be a tuple with (coordinates, data) where coordinate is a tuple with chromosome,position_start, position_end and data is a tuple with the data. The data of the two object will be merged for matching lines. For the first object only the start coordinate is taken into account. ''' def __init__(self, item1, item2): self.c2 = item2 try: coordinates, self._last_data = next(self.c2) except StopIteration: coordinates, self._last_data = ((None, 0, 0), (None, )) self._chromosome, self._last_window_s, self._last_window_e = \ coordinates self.c1 = item1 self._last_chromosome = None _sentinel = object() def __next__(self): return self.next() def next(self): self.c1_line = next(self.c1) going_on = True while going_on: if self._chromosome == self.c1_line[0][0]: self._last_chromosome = self._chromosome if self.c1_line[0][1] >= self._last_window_s and \ self.c1_line[0][1] < self._last_window_e: data = self.c1_line[1] + self._last_data return (self.c1_line[0], data) going_on = False elif self.c1_line[0][1] < self._last_window_s: self.c1_line = next(self.c1) elif self.c1_line[0][1] >= self._last_window_e: coordinates, self._last_data = next(self.c2) self._chromosome, self._last_window_s, \ self._last_window_e = coordinates else: if self._last_chromosome != self._chromosome and \ self._last_chromosome is not None: self.c1_line = next(self.c1) else: coordinates, self._last_data = next(self.c2) self._chromosome, self._last_window_s, \ self._last_window_e = coordinates def close(self): self.c1.close() self.c2.close() def __iter__(self): return (iter(self.next, self._sentinel))
def chrompos_keyfunc(line, i): chrom = line[0] chrom = chrom.replace('chr', '') try: chrom = int(chrom) except ValueError: chrom = int(chrom, 36) return (chrom, line[1], i) def decorated_item(f, i): for line in f: yield (chrompos_keyfunc(line[0], i), line) def merge_items(item1, item2): files = [item1, item2] for line in merge(*[decorated_item(files[i], i + 1) for i in [0, 1]]): yield line[1]
[docs]def zip_fast(item1, item2): ''' Use the native implementation of the heapq algorithm to sort and merge files chromosome-coordinate ordered. It assumes that the two files are position ordered and both files have the same chromosome order. It differs from zip_coordinates by the fact that this return all the position present in both files, group together the lines present in both ''' merged = merge_items(item1, item2) store_line = next(merged) for line in merged: if store_line[0][0] == line[0][0] and \ store_line[0][2] > line[0][1]: yield (store_line[0], (store_line[1][0], line[1][0])) if store_line[0][2] == line[0][2] and \ store_line[0][1] == line[0][1]: store_line = next(merged) else: yield (store_line[0], (store_line[1][0], None)) store_line = line if store_line == line: yield (store_line[0], (store_line[1][0], None))